Skip to content

Commit d1c384d

Browse files
feature: handle null field in case class (#267)
* feature: handle null field in case class * fix 4 bytes missing in fixed length case class * fix null value handling to respect the fixed length
1 parent 194aa36 commit d1c384d

File tree

9 files changed

+103
-19
lines changed

9 files changed

+103
-19
lines changed

README.md

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ From Flink 1.19, a check is done to detect this misusage. To disable it, see [Di
209209
210210
### Flink ADT
211211

212-
To derive a TypeInformation for a sealed trait, you can do:
212+
To derive a TypeInformation for a case class or sealed trait, you can do:
213213

214214
```scala mdoc:reset-object
215215
import org.apache.flinkx.api.serializers._
@@ -227,6 +227,31 @@ object Event {
227227

228228
Be careful with a wildcard import of import `org.apache.flink.api.scala._`: it has a `createTypeInformation` implicit function, which may happily generate you a kryo-based serializer in a place you never expected. So in a case if you want to do this type of wildcard import, make sure that you explicitly called `deriveTypeInformation` for all the sealed traits in the current scope.
229229

230+
#### Null value handling
231+
232+
A case class can be null, the case class serializer natively handles the null case.
233+
234+
A case class field can also be null, either:
235+
- the serializer of this field natively handles its nullability.
236+
- the field must be annotated with `@nullable` in order to be wrapped in Flink's `NullableSerializer`.
237+
238+
In any cases, it can be a good hint to use `@nullable` annotation to indicate when fields are meant to be nullable.
239+
240+
```scala mdoc:reset-object
241+
import org.apache.flinkx.api.serializers._
242+
import org.apache.flinkx.api.serializer.nullable
243+
import org.apache.flink.api.common.typeinfo.TypeInformation
244+
245+
case class Click(id: String, clickEvent: ClickEvent)
246+
247+
case class ClickEvent(
248+
@nullable history: Array[String], // @nullable allows to handle null array
249+
@nullable id: String) // Effectless here as null strings are natively handled
250+
251+
Click("id1", null) // A case class can be null
252+
Click("id2", ClickEvent(null, null)) // Valid thanks to @nullable
253+
```
254+
230255
### Java types
231256

232257
Built-in serializers are for Scala language abstractions and won't derive `TypeInformation` for Java classes (as they don't extend the `scala.Product` type). But you can always fall back to Flink's own POJO serializer in this way, so just make it implicit so this API can pick it up:

modules/flink-1-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ package org.apache.flinkx.api
33
import magnolia1.{CaseClass, Magnolia, SealedTrait}
44
import org.apache.flink.api.common.ExecutionConfig
55
import org.apache.flink.api.common.typeinfo.TypeInformation
6-
import org.apache.flinkx.api.serializer.{CoproductSerializer, CaseClassSerializer, ScalaCaseObjectSerializer}
6+
import org.apache.flink.api.java.typeutils.runtime.NullableSerializer
7+
import org.apache.flinkx.api.serializer.{CaseClassSerializer, CoproductSerializer, ScalaCaseObjectSerializer, nullable}
78
import org.apache.flinkx.api.typeinfo.{CoproductTypeInformation, ProductTypeInformation}
89
import org.apache.flinkx.api.util.ClassUtil.isCaseClassImmutable
910

@@ -32,7 +33,12 @@ private[api] trait LowPrioImplicits {
3233
} else {
3334
new CaseClassSerializer[T](
3435
clazz = clazz,
35-
scalaFieldSerializers = ctx.parameters.map(_.typeclass.createSerializer(config)).toArray,
36+
scalaFieldSerializers = ctx.parameters.map { p =>
37+
val ser = p.typeclass.createSerializer(config)
38+
if (p.annotations.exists(_.isInstanceOf[nullable])) {
39+
NullableSerializer.wrapIfNullIsNotSupported(ser, true)
40+
} else ser
41+
}.toArray,
3642
isCaseClassImmutable = isCaseClassImmutable(clazz, ctx.parameters.map(_.label))
3743
)
3844
}

modules/flink-1-api/src/main/scala-3/org/apache/flinkx/api/LowPrioImplicits.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import magnolia1.{CaseClass, SealedTrait}
88
import org.apache.flink.api.common.typeinfo.TypeInformation
99
import org.apache.flink.api.common.typeutils.TypeSerializer
1010
import org.apache.flink.api.common.ExecutionConfig
11-
import org.apache.flinkx.api.serializer.{CaseClassSerializer, CoproductSerializer, ScalaCaseObjectSerializer}
11+
import org.apache.flink.api.java.typeutils.runtime.NullableSerializer
12+
import org.apache.flinkx.api.serializer.{CaseClassSerializer, CoproductSerializer, ScalaCaseObjectSerializer, nullable}
1213
import org.apache.flinkx.api.typeinfo.{CoproductTypeInformation, ProductTypeInformation}
1314
import org.apache.flinkx.api.util.ClassUtil.isCaseClassImmutable
1415

@@ -38,8 +39,12 @@ private[api] trait LowPrioImplicits extends TaggedDerivation[TypeInformation]:
3839
else
3940
new CaseClassSerializer[T & Product](
4041
clazz = clazz,
41-
scalaFieldSerializers =
42-
IArray.genericWrapArray(ctx.params.map(_.typeclass.createSerializer(config))).toArray,
42+
scalaFieldSerializers = IArray.genericWrapArray(ctx.params.map { p =>
43+
val ser = p.typeclass.createSerializer(config)
44+
if (p.annotations.exists(_.isInstanceOf[nullable])) {
45+
NullableSerializer.wrapIfNullIsNotSupported(ser, true)
46+
} else ser
47+
}).toArray,
4348
isCaseClassImmutable = isCaseClassImmutable(clazz, ctx.params.map(_.label))
4449
)
4550
val ti = new ProductTypeInformation[T & Product](

modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SchemaEvolutionTest.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package org.apache.flinkx.api
33
import org.apache.flink.api.common.ExecutionConfig
44
import org.apache.flink.api.common.typeinfo.TypeInformation
55
import org.apache.flink.core.memory._
6-
import org.apache.flinkx.api.SchemaEvolutionTest.{Click, ClickEvent, Event, View}
6+
import org.apache.flinkx.api.SchemaEvolutionTest.{Click, ClickEvent, Event}
77
import org.apache.flinkx.api.serializer.CaseClassSerializer
88
import org.apache.flinkx.api.serializers._
99
import org.scalatest.flatspec.AnyFlatSpec
@@ -13,11 +13,9 @@ import java.io.ByteArrayOutputStream
1313
import java.nio.file.{Files, Path}
1414

1515
class SchemaEvolutionTest extends AnyFlatSpec with Matchers {
16-
private implicit val clickEventTypeInfo: TypeInformation[ClickEvent] = deriveTypeInformation[ClickEvent]
17-
private implicit val viewInfo: TypeInformation[View] = deriveTypeInformation[View]
18-
private implicit val newClickTypeInfo: TypeInformation[Click] = deriveTypeInformation[Click]
19-
private implicit val eventTypeInfo: TypeInformation[Event] = deriveTypeInformation[Event]
20-
private val clicks =
16+
private implicit val newClickTypeInfo: TypeInformation[Click] = deriveTypeInformation[Click]
17+
private implicit val eventTypeInfo: TypeInformation[Event] = deriveTypeInformation[Event]
18+
private val clicks =
2119
List(ClickEvent("a", "2021-01-01"), ClickEvent("b", "2021-01-01"), ClickEvent("c", "2021-01-01"))
2220

2321
def createSerializer[T: TypeInformation] =

modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ package org.apache.flinkx.api
33
import cats.data.NonEmptyList
44
import org.apache.flink.api.common.ExecutionConfig
55
import org.apache.flink.api.common.typeinfo.TypeInformation
6+
import org.apache.flink.api.common.typeutils.base.StringSerializer
7+
import org.apache.flink.api.java.typeutils.runtime.NullableSerializer
68
import org.apache.flinkx.api.SerializerTest.DeeplyNested.ModeNested.SuperNested.{Egg, Food}
79
import org.apache.flinkx.api.SerializerTest.NestedRoot.NestedMiddle.NestedBottom
810
import org.apache.flinkx.api.SerializerTest._
911
import org.apache.flinkx.api.serializers._
12+
import org.apache.flinkx.api.serializer.{CaseClassSerializer, nullable}
1013
import org.scalatest.Inspectors
1114
import org.scalatest.flatspec.AnyFlatSpec
1215
import org.scalatest.matchers.should.Matchers
@@ -204,6 +207,11 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test
204207
roundtrip(ser, ExtendingCaseClass("abc", "def"))
205208
}
206209

210+
it should "serialize a null case class" in {
211+
val ser = implicitly[TypeInformation[Simple]].createSerializer(ec)
212+
roundtrip(ser, null)
213+
}
214+
207215
it should "serialize a case class with nullable field" in {
208216
val ser = implicitly[TypeInformation[NullableField]].createSerializer(ec)
209217
roundtrip(ser, NullableField(null, Bar(1)))
@@ -214,12 +222,26 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test
214222
roundtrip(ser, NullableFieldWithNoArity(null))
215223
}
216224

225+
it should "serialize nullable fields" in {
226+
val ser = implicitly[TypeInformation[SimpleJava]].createSerializer(ec)
227+
roundtrip(ser, SimpleJava(null, null))
228+
val ccser = ser.asInstanceOf[CaseClassSerializer[SimpleJava]]
229+
// IntSerializer doesn't handle null so it's wrapped in a NullableSerializer
230+
ccser.getFieldSerializers()(0) shouldBe a[NullableSerializer[Integer]]
231+
ccser.getFieldSerializers()(1) shouldBe a[StringSerializer] // StringSerializer natively handles null
232+
}
233+
234+
it should "serialize a case class with a nullable field of a fixed size case class" in {
235+
val ser = implicitly[TypeInformation[NullableFixedSizeCaseClass]].createSerializer(ec)
236+
roundtrip(ser, NullableFixedSizeCaseClass(null))
237+
}
238+
217239
}
218240

219241
object SerializerTest {
220242
case class Simple(a: Int, b: String)
221243
case class SimpleList(a: List[Int])
222-
case class SimpleJava(a: Integer, b: String)
244+
case class SimpleJava(@nullable a: Integer, @nullable b: String)
223245
case class JavaTime(a: Instant, b: LocalDate, c: LocalDateTime)
224246
case class Nested(a: Simple)
225247

@@ -296,4 +318,6 @@ object SerializerTest {
296318

297319
final case class NullableFieldWithNoArity(var a: NoArity)
298320

321+
final case class NullableFixedSizeCaseClass(@nullable javaTime: JavaTime)
322+
299323
}

modules/flink-2-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ package org.apache.flinkx.api
33
import magnolia1.{CaseClass, Magnolia, SealedTrait}
44
import org.apache.flink.api.common.serialization.SerializerConfig
55
import org.apache.flink.api.common.typeinfo.TypeInformation
6-
import org.apache.flinkx.api.serializer.{CoproductSerializer, CaseClassSerializer, ScalaCaseObjectSerializer}
6+
import org.apache.flink.api.java.typeutils.runtime.NullableSerializer
7+
import org.apache.flinkx.api.serializer.{CaseClassSerializer, CoproductSerializer, ScalaCaseObjectSerializer, nullable}
78
import org.apache.flinkx.api.typeinfo.{CoproductTypeInformation, ProductTypeInformation}
89
import org.apache.flinkx.api.util.ClassUtil.isCaseClassImmutable
910

@@ -32,7 +33,12 @@ private[api] trait LowPrioImplicits {
3233
} else {
3334
new CaseClassSerializer[T](
3435
clazz = clazz,
35-
scalaFieldSerializers = ctx.parameters.map(_.typeclass.createSerializer(config)).toArray,
36+
scalaFieldSerializers = ctx.parameters.map { p =>
37+
val ser = p.typeclass.createSerializer(config)
38+
if (p.annotations.exists(_.isInstanceOf[nullable])) {
39+
NullableSerializer.wrapIfNullIsNotSupported(ser, true)
40+
} else ser
41+
}.toArray,
3642
isCaseClassImmutable = isCaseClassImmutable(clazz, ctx.parameters.map(_.label))
3743
)
3844
}

modules/flink-2-api/src/main/scala-3/org/apache/flinkx/api/LowPrioImplicits.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import scala.collection.mutable
44
import scala.compiletime.summonInline
55
import scala.deriving.Mirror
66
import scala.reflect.ClassTag
7-
87
import magnolia1.{CaseClass, SealedTrait}
98
import org.apache.flink.api.common.typeinfo.TypeInformation
109
import org.apache.flink.api.common.typeutils.TypeSerializer
1110
import org.apache.flink.api.common.serialization.SerializerConfig
12-
import org.apache.flinkx.api.serializer.{CoproductSerializer, CaseClassSerializer, ScalaCaseObjectSerializer}
11+
import org.apache.flink.api.java.typeutils.runtime.NullableSerializer
12+
import org.apache.flinkx.api.serializer.{CaseClassSerializer, CoproductSerializer, ScalaCaseObjectSerializer, nullable}
1313
import org.apache.flinkx.api.typeinfo.{CoproductTypeInformation, ProductTypeInformation}
1414
import org.apache.flinkx.api.util.ClassUtil.isCaseClassImmutable
1515

@@ -39,8 +39,12 @@ private[api] trait LowPrioImplicits extends TaggedDerivation[TypeInformation]:
3939
else
4040
new CaseClassSerializer[T & Product](
4141
clazz = clazz,
42-
scalaFieldSerializers =
43-
IArray.genericWrapArray(ctx.params.map(_.typeclass.createSerializer(config))).toArray,
42+
scalaFieldSerializers = IArray.genericWrapArray(ctx.params.map { p =>
43+
val ser = p.typeclass.createSerializer(config)
44+
if (p.annotations.exists(_.isInstanceOf[nullable])) {
45+
NullableSerializer.wrapIfNullIsNotSupported(ser, true)
46+
} else ser
47+
}).toArray,
4448
isCaseClassImmutable = isCaseClassImmutable(clazz, ctx.params.map(_.label))
4549
)
4650
val ti = new ProductTypeInformation[T & Product](

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnap
2222
import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
2323
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
2424
import org.apache.flink.types.NullFieldException
25+
import org.apache.flinkx.api.serializer.CaseClassSerializer.EmptyByteArray
2526
import org.slf4j.{Logger, LoggerFactory}
2627

2728
/** Serializer for Case Classes. Creation and access is different from our Java Tuples so we have to treat them
@@ -39,6 +40,8 @@ class CaseClassSerializer[T <: Product](
3940

4041
@transient private lazy val log: Logger = LoggerFactory.getLogger(this.getClass)
4142

43+
private val nullPadding: Array[Byte] = if (super.getLength > 0) new Array(super.getLength) else EmptyByteArray
44+
4245
override val isImmutableType: Boolean = isCaseClassImmutable &&
4346
fieldSerializers.forall(Option(_).exists(_.isImmutableType))
4447
val isImmutableSerializer: Boolean =
@@ -92,10 +95,13 @@ class CaseClassSerializer[T <: Product](
9295
createInstance(fields.toArray)
9396
}
9497

98+
override val getLength: Int = if (super.getLength == -1) -1 else super.getLength + 4 // +4 bytes for the arity field
99+
95100
def serialize(value: T, target: DataOutputView): Unit = {
96101
// Write an arity of -1 to indicate null value
97102
val sourceArity = if (value == null) -1 else arity
98103
target.writeInt(sourceArity)
104+
if (value == null) target.write(nullPadding)
99105

100106
(0 until sourceArity).foreach { i =>
101107
val serializer = fieldSerializers(i).asInstanceOf[TypeSerializer[Any]]
@@ -114,6 +120,7 @@ class CaseClassSerializer[T <: Product](
114120
def deserialize(source: DataInputView): T = {
115121
val sourceArity = source.readInt()
116122
if (sourceArity == -1) {
123+
source.skipBytesToRead(nullPadding.length)
117124
null.asInstanceOf[T]
118125
} else {
119126
val fields = new Array[AnyRef](sourceArity)
@@ -134,3 +141,7 @@ class CaseClassSerializer[T <: Product](
134141
new ScalaCaseClassSerializerSnapshot[T](this)
135142

136143
}
144+
145+
object CaseClassSerializer {
146+
private val EmptyByteArray: Array[Byte] = new Array(0)
147+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.apache.flinkx.api.serializer
2+
3+
import scala.annotation.StaticAnnotation
4+
5+
final class nullable extends StaticAnnotation

0 commit comments

Comments
 (0)