Skip to content

Commit fdaaef7

Browse files
review: Fix checks on arity and totalFields
1 parent 9d1e5ee commit fdaaef7

File tree

5 files changed

+23
-11
lines changed

5 files changed

+23
-11
lines changed

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassTypeInfo.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ class CaseClassTypeInfo[T <: Product](
6363
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD)
6464
private val PATTERN_INT_FIELD: Pattern = Pattern.compile(REGEX_INT_FIELD)
6565

66+
override def getTotalFields: Int =
67+
if (super.getTotalFields == 0) 1 else super.getTotalFields // The total number of fields must be at least 1.
68+
6669
@PublicEvolving
6770
def getFieldIndices(fields: Array[String]): Array[Int] = {
6871
fields map { x => fieldNames.indexOf(x) }

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/CollectionTypeInformation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,5 @@ case class CollectionTypeInformation[T: ClassTag](serializer: TypeSerializer[T])
1919
override def isKeyType: Boolean = false
2020
override def getTotalFields: Int = 1
2121
override def getTypeClass: Class[T] = clazz
22-
override def getArity: Int = 1
22+
override def getArity: Int = 0
2323
}

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/CoproductTypeInformation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ case class CoproductTypeInformation[T](c: Class[T], ser: TypeSerializer[T]) exte
1616
override def isKeyType: Boolean = false
1717
override def getTotalFields: Int = 1
1818
override def getTypeClass: Class[T] = c
19-
override def getArity: Int = 1
19+
override def getArity: Int = 0
2020
}

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/UnitTypeInformation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ class UnitTypeInformation extends TypeInformation[Unit] {
1313
def createSerializer(config: ExecutionConfig): TypeSerializer[Unit] = new UnitSerializer()
1414

1515
override def isKeyType: Boolean = false
16-
override def getTotalFields: Int = 0
16+
override def getTotalFields: Int = 1 // The total number of fields must be at least 1.
1717
override def isTupleType: Boolean = false
1818
override def canEqual(obj: Any): Boolean = obj.isInstanceOf[UnitTypeInformation]
1919
override def getTypeClass: Class[Unit] = classOf[Unit]
20-
override def getArity: Int = 0
20+
override def getArity: Int = 1 // Basic type has an arity of 1
2121
override def isBasicType: Boolean = true
2222

2323
override def toString: String = "{}"

modules/flink-common-api/src/test/scala/org/apache/flinkx/api/TestUtils.scala

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.apache.flinkx.api
22

3-
import org.apache.flink.api.common.serialization.SerializerConfigImpl
43
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
54
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
65
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
@@ -154,9 +153,15 @@ trait TestUtils extends Matchers with Inspectors {
154153

155154
private def isNotBitmapField(field: Field): Boolean = !field.getName.startsWith("bitmap$")
156155

156+
private def isNotOverriddenField(field: Field, childFields: Array[Field]): Boolean =
157+
!childFields.exists(_.getName == field.getName)
158+
157159
@tailrec
158160
private def getAllFields[T](typeClass: Class[T], fields: Array[Field] = Array.empty): Array[Field] = {
159-
val allFields = fields ++ typeClass.getDeclaredFields.filter(isInstanceField).filter(isNotBitmapField)
161+
val allFields = fields ++ typeClass.getDeclaredFields
162+
.filter(isInstanceField)
163+
.filter(isNotBitmapField)
164+
.filter(isNotOverriddenField(_, fields))
160165
val superClass = typeClass.getSuperclass
161166
if (superClass == null || superClass == classOf[Object]) {
162167
allFields
@@ -195,9 +200,13 @@ trait TestUtils extends Matchers with Inspectors {
195200
}
196201

197202
def checkTotalFields[T](info: TypeInformation[T]): Assertion = {
198-
val typeClass = info.getTypeClass
199-
val totalFields = getTotalFields(typeClass)
200-
withClue("getTotalFields:")(info.getTotalFields shouldBe totalFields)
203+
val typeClass = info.getTypeClass
204+
if (typeClass.getTypeParameters.isEmpty) {
205+
val totalFields = getTotalFields(typeClass)
206+
withClue("getTotalFields:")(info.getTotalFields shouldBe totalFields)
207+
} else {
208+
succeed // cannot check getTotalFields for generic types.
209+
}
201210
}
202211

203212
def checkKeyType[T](info: TypeInformation[T]): Assertion = info match {
@@ -230,8 +239,8 @@ trait TestUtils extends Matchers with Inspectors {
230239
}
231240
checkBasicType(infoToCheck)
232241
checkTupleType(infoToCheck)
233-
// checkArity(infoToCheck)
234-
// checkTotalFields(infoToCheck)
242+
checkArity(infoToCheck)
243+
checkTotalFields(infoToCheck)
235244
checkKeyType(infoToCheck)
236245
checkToString(infoToCheck)
237246
checkEquals(infoToCheck)

0 commit comments

Comments
 (0)