Skip to content

Commit 945c57e

Browse files
Extract type information constants
1 parent c8ad026 commit 945c57e

File tree

5 files changed

+25
-5
lines changed

5 files changed

+25
-5
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.apache.flinkx
2+
3+
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
4+
5+
package object api {
6+
7+
/** Basic type has an arity of 1. See [[BasicTypeInfo#getArity()]] */
8+
private[api] val BasicTypeArity: Int = 1
9+
10+
/** Basic type has 1 field. See [[BasicTypeInfo#getTotalFields()]] */
11+
private[api] val BasicTypeTotalFields: Int = 1
12+
13+
/** Documentation of [[TypeInformation#getTotalFields()]] states the total number of fields must be at least 1. */
14+
private[api] val MinimumTotalFields: Int = 1
15+
16+
}

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassTypeInfo.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeutils.CompositeType.{
3030
}
3131
import org.apache.flink.api.common.typeutils._
3232
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
33+
import org.apache.flinkx.api.MinimumTotalFields
3334

3435
import java.util.regex.{Matcher, Pattern}
3536
import scala.annotation.tailrec
@@ -64,7 +65,7 @@ class CaseClassTypeInfo[T <: Product](
6465
private val PATTERN_INT_FIELD: Pattern = Pattern.compile(REGEX_INT_FIELD)
6566

6667
override def getTotalFields: Int =
67-
if (super.getTotalFields == 0) 1 else super.getTotalFields // The total number of fields must be at least 1.
68+
if (super.getTotalFields == 0) MinimumTotalFields else super.getTotalFields
6869

6970
@PublicEvolving
7071
def getFieldIndices(fields: Array[String]): Array[Int] = {

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/CollectionTypeInformation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import org.apache.flink.api.common.ExecutionConfig
44
import org.apache.flink.api.common.serialization.SerializerConfig
55
import org.apache.flink.api.common.typeinfo.TypeInformation
66
import org.apache.flink.api.common.typeutils.TypeSerializer
7+
import org.apache.flinkx.api.MinimumTotalFields
78

89
import scala.reflect.{ClassTag, classTag}
910

@@ -17,7 +18,7 @@ case class CollectionTypeInformation[T: ClassTag](serializer: TypeSerializer[T])
1718
override def isBasicType: Boolean = false
1819
override def isTupleType: Boolean = false
1920
override def isKeyType: Boolean = false
20-
override def getTotalFields: Int = 1
21+
override def getTotalFields: Int = MinimumTotalFields
2122
override def getTypeClass: Class[T] = clazz
2223
override def getArity: Int = 0
2324
}

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/CoproductTypeInformation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import org.apache.flink.api.common.ExecutionConfig
44
import org.apache.flink.api.common.serialization.SerializerConfig
55
import org.apache.flink.api.common.typeinfo.TypeInformation
66
import org.apache.flink.api.common.typeutils.TypeSerializer
7+
import org.apache.flinkx.api.MinimumTotalFields
78

89
case class CoproductTypeInformation[T](c: Class[T], ser: TypeSerializer[T]) extends TypeInformation[T] {
910

@@ -14,7 +15,7 @@ case class CoproductTypeInformation[T](c: Class[T], ser: TypeSerializer[T]) exte
1415
override def isBasicType: Boolean = false
1516
override def isTupleType: Boolean = false
1617
override def isKeyType: Boolean = false
17-
override def getTotalFields: Int = 1
18+
override def getTotalFields: Int = MinimumTotalFields
1819
override def getTypeClass: Class[T] = c
1920
override def getArity: Int = 0
2021
}

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/UnitTypeInformation.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import org.apache.flink.api.common.serialization.SerializerConfig
55
import org.apache.flink.api.common.typeinfo.TypeInformation
66
import org.apache.flink.api.common.typeutils.TypeSerializer
77
import org.apache.flinkx.api.serializer.UnitSerializer
8+
import org.apache.flinkx.api.{BasicTypeArity, BasicTypeTotalFields}
89

910
class UnitTypeInformation extends TypeInformation[Unit] {
1011

@@ -13,11 +14,11 @@ class UnitTypeInformation extends TypeInformation[Unit] {
1314
def createSerializer(config: ExecutionConfig): TypeSerializer[Unit] = new UnitSerializer()
1415

1516
override def isKeyType: Boolean = false
16-
override def getTotalFields: Int = 1 // The total number of fields must be at least 1.
17+
override def getTotalFields: Int = BasicTypeTotalFields
1718
override def isTupleType: Boolean = false
1819
override def canEqual(obj: Any): Boolean = obj.isInstanceOf[UnitTypeInformation]
1920
override def getTypeClass: Class[Unit] = classOf[Unit]
20-
override def getArity: Int = 1 // Basic type has an arity of 1
21+
override def getArity: Int = BasicTypeArity
2122
override def isBasicType: Boolean = true
2223

2324
override def toString: String = "{}"

0 commit comments

Comments
 (0)