Skip to content

Commit bf0d998

Browse files
Improve checks on serializer: instance and binary copy
Add checks on type information Add Scalatest matchers "haveTypeInfo" and beSerializable
1 parent 4908580 commit bf0d998

File tree

4 files changed

+328
-118
lines changed

4 files changed

+328
-118
lines changed

modules/flink-common-api/src/test/scala/org/apache/flinkx/api/AnyTest.scala

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,30 @@
11
package org.apache.flinkx.api
22

33
import cats.data.NonEmptyList
4-
import org.apache.flink.api.common.serialization.SerializerConfigImpl
54
import org.apache.flink.api.common.typeinfo.TypeInformation
5+
import org.apache.flinkx.api.AnyTest._
66
import org.apache.flinkx.api.AnyTest.FAny.FValueAny.FTerm
77
import org.apache.flinkx.api.AnyTest.FAny.FValueAny.FTerm.StringTerm
8-
import org.apache.flinkx.api.AnyTest._
98
import org.apache.flinkx.api.serializers._
109
import org.scalatest.flatspec.AnyFlatSpec
1110
import org.scalatest.matchers.should.Matchers
1211

1312
class AnyTest extends AnyFlatSpec with Matchers with TestUtils {
14-
val ec = new SerializerConfigImpl()
15-
16-
def createSerializer[T: TypeInformation] =
17-
implicitly[TypeInformation[T]].createSerializer(ec)
1813

1914
it should "serialize concrete class" in {
20-
val ser = createSerializer[StringTerm]
21-
roundtrip(ser, StringTerm("fo"))
15+
StringTerm("fo") should haveTypeInfoAndBeSerializable[StringTerm]
2216
}
2317

2418
it should "serialize ADT" in {
25-
val ser = createSerializer[FAny]
26-
roundtrip(ser, StringTerm("fo"))
19+
StringTerm("fo") should haveTypeInfoAndBeSerializable[FAny](nullable = false)
2720
}
2821

2922
it should "serialize NEL" in {
30-
val ser = createSerializer[NonEmptyList[FTerm]]
31-
roundtrip(ser, NonEmptyList.one(StringTerm("fo")))
23+
NonEmptyList.one(StringTerm("fo")) should haveTypeInfoAndBeSerializable[NonEmptyList[FTerm]]
3224
}
3325

3426
it should "serialize nested nel" in {
35-
val ser = createSerializer[TermFilter]
36-
roundtrip(ser, TermFilter("a", NonEmptyList.one(StringTerm("fo"))))
27+
TermFilter("a", NonEmptyList.one(StringTerm("fo"))) should haveTypeInfoAndBeSerializable[TermFilter]
3728
}
3829
}
3930

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,18 @@
11
package org.apache.flinkx.api
22

33
import cats.data.NonEmptyList
4-
import org.apache.flink.api.common.serialization.SerializerConfigImpl
54
import org.apache.flink.api.common.typeinfo.TypeInformation
65
import org.apache.flinkx.api.serializers._
76
import org.scalatest.flatspec.AnyFlatSpec
87
import org.scalatest.matchers.should.Matchers
98

109
class CatsTest extends AnyFlatSpec with Matchers with TestUtils {
11-
implicit val stringListTi: TypeInformation[NonEmptyList[String]] = deriveTypeInformation
12-
implicit val intListTi: TypeInformation[NonEmptyList[Int]] = deriveTypeInformation
13-
14-
def createSerializer[T: TypeInformation] =
15-
implicitly[TypeInformation[T]].createSerializer(new SerializerConfigImpl())
1610

1711
it should "derive for NEL[String]" in {
18-
val ser = createSerializer[NonEmptyList[String]]
19-
roundtrip(ser, NonEmptyList.one("doo"))
12+
NonEmptyList.one("doo") should haveTypeInfoAndBeSerializable[NonEmptyList[String]]
2013
}
2114

2215
it should "derive for NEL[Int]" in {
23-
val ser = createSerializer[NonEmptyList[Int]]
24-
roundtrip(ser, NonEmptyList.one(1))
16+
NonEmptyList.one(1) should haveTypeInfoAndBeSerializable[NonEmptyList[Int]]
2517
}
2618
}

modules/flink-common-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala

Lines changed: 47 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import org.apache.flink.api.common.serialization.SerializerConfigImpl
55
import org.apache.flink.api.common.typeinfo.TypeInformation
66
import org.apache.flink.api.common.typeutils.base.StringSerializer
77
import org.apache.flink.api.java.typeutils.runtime.NullableSerializer
8-
import org.apache.flinkx.api.SerializerTest.DeeplyNested.ModeNested.SuperNested.{Egg, Food}
8+
import org.apache.flinkx.api.SerializerTest.DeeplyNested.ModeNested.SuperNested.{Egg, Food, Ham}
99
import org.apache.flinkx.api.SerializerTest.NestedRoot.NestedMiddle.NestedBottom
1010
import org.apache.flinkx.api.SerializerTest._
1111
import org.apache.flinkx.api.serializer.{CaseClassSerializer, nullable}
@@ -21,69 +21,56 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test
2121
val ec = new SerializerConfigImpl()
2222

2323
it should "derive serializer for simple class" in {
24-
val ser = implicitly[TypeInformation[Simple]].createSerializer(ec)
25-
all(ser, Simple(1, "foo"))
24+
Simple(1, "foo") should haveTypeInfoAndBeSerializable[Simple]
2625
}
2726

2827
it should "derive serializer for java classes" in {
29-
val ser = implicitly[TypeInformation[SimpleJava]].createSerializer(ec)
30-
all(ser, SimpleJava(1, "foo"))
28+
SimpleJava(1, "foo") should beSerializable[SimpleJava]
3129
}
3230

3331
it should "derive serializer for java.time classes" in {
34-
val ser = implicitly[TypeInformation[JavaTime]].createSerializer(ec)
35-
all(ser, JavaTime(Instant.now(), LocalDate.now(), LocalDateTime.now()))
32+
JavaTime(Instant.now(), LocalDate.now(), LocalDateTime.now()) should haveTypeInfoAndBeSerializable[JavaTime]
3633
}
3734

3835
it should "derive nested classes" in {
39-
val ser = implicitly[TypeInformation[Nested]].createSerializer(ec)
40-
all(ser, Nested(Simple(1, "foo")))
36+
Nested(Simple(1, "foo")) should haveTypeInfoAndBeSerializable[Nested]
4137
}
4238

4339
it should "derive for ADTs" in {
44-
val ser = implicitly[TypeInformation[ADT]].createSerializer(ec)
45-
all(ser, Foo("a"))
46-
all(ser, Bar(1))
40+
Foo("a") should haveTypeInfoAndBeSerializable[ADT](nullable = false)
41+
Bar(1) should haveTypeInfoAndBeSerializable[ADT](nullable = false)
4742
}
4843

4944
it should "derive for ADTs with case objects" in {
50-
val ser = implicitly[TypeInformation[ADT2]].createSerializer(ec)
51-
all(ser, Foo2)
52-
all(ser, Bar2)
45+
Foo2 should haveTypeInfoAndBeSerializable[ADT2](nullable = false)
46+
Bar2 should haveTypeInfoAndBeSerializable[ADT2](nullable = false)
5347
}
5448

5549
it should "derive for deeply nested classes" in {
56-
val ser = implicitly[TypeInformation[Egg]].createSerializer(ec)
57-
all(ser, Egg(1))
50+
Egg(1) should haveTypeInfoAndBeSerializable[Egg]
5851
}
5952

6053
it should "derive for deeply nested adts" in {
61-
val ser = implicitly[TypeInformation[Food]].createSerializer(ec)
62-
all(ser, Egg(1))
54+
Egg(1) should haveTypeInfoAndBeSerializable[Food](nullable = false)
6355
}
6456

6557
it should "derive for nested ADTs" in {
66-
val ser = implicitly[TypeInformation[WrappedADT]].createSerializer(ec)
67-
all(ser, WrappedADT(Foo("a")))
68-
all(ser, WrappedADT(Bar(1)))
58+
WrappedADT(Foo("a")) should haveTypeInfoAndBeSerializable[WrappedADT]
59+
WrappedADT(Bar(1)) should haveTypeInfoAndBeSerializable[WrappedADT]
6960
}
7061

7162
it should "derive for generic ADTs" in {
72-
val ser = implicitly[TypeInformation[Param[Int]]].createSerializer(ec)
73-
all(ser, P2(1))
63+
P2(1) should haveTypeInfoAndBeSerializable[Param[Int]](nullable = false)
7464
}
7565

7666
it should "derive seq" in {
77-
val ser = implicitly[TypeInformation[SimpleSeq]].createSerializer(ec)
78-
noKryo(ser)
79-
serializable(ser)
67+
SimpleSeq(Seq(Simple(1, "a"))) should haveTypeInfoAndBeSerializable[SimpleSeq]
8068
}
8169

8270
it should "derive list of ADT" in {
83-
val ser = implicitly[TypeInformation[List[ADT]]].createSerializer(ec)
84-
all(ser, List(Foo("a")))
85-
roundtrip(ser, ::(Foo("a"), Nil))
86-
roundtrip(ser, Nil)
71+
List(Foo("a")) should haveTypeInfoAndBeSerializable[List[ADT]](nullable = false)
72+
::(Foo("a"), Nil) should beSerializable[List[ADT]](nullable = false)
73+
Nil should beSerializable[List[ADT]](nullable = false)
8774
}
8875

8976
it should "derive recursively" in {
@@ -92,24 +79,19 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test
9279
}
9380

9481
it should "derive list" in {
95-
val ser = implicitly[TypeInformation[List[Simple]]].createSerializer(ec)
96-
all(ser, List(Simple(1, "a")))
82+
List(Simple(1, "a")) should haveTypeInfoAndBeSerializable[List[Simple]](nullable = false)
9783
}
9884

9985
it should "derive nested list" in {
100-
val ser = implicitly[TypeInformation[List[SimpleList]]].createSerializer(ec)
101-
all(ser, List(SimpleList(List(1))))
86+
List(SimpleList(List(1))) should haveTypeInfoAndBeSerializable[List[SimpleList]](nullable = false)
10287
}
10388

10489
it should "derive seq of seq" in {
105-
val ser = implicitly[TypeInformation[SimpleSeqSeq]].createSerializer(ec)
106-
noKryo(ser)
107-
serializable(ser)
90+
SimpleSeqSeq(Seq(Seq(Simple(1, "a")))) should haveTypeInfoAndBeSerializable[SimpleSeqSeq]
10891
}
10992

11093
it should "derive generic type bounded classes" in {
111-
val ser = implicitly[TypeInformation[BoundADT[Foo]]].createSerializer(ec)
112-
noKryo(ser)
94+
BoundADT(Foo("a")) should haveTypeInfoAndBeSerializable[BoundADT[Foo]]
11395
}
11496

11597
// it should "derive nested generic type bounded classes" in {
@@ -126,114 +108,97 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test
126108

127109
it should "be serializable in case of annotations on classes" in {
128110
val ser = implicitly[TypeInformation[Annotated]].createSerializer(ec)
129-
serializable(ser)
111+
javaSerializable(ser)
130112
}
131113

132114
it should "be serializable in case of annotations on subtypes" in {
133115
val ser = implicitly[TypeInformation[Ann]].createSerializer(ec)
134-
serializable(ser)
116+
javaSerializable(ser)
135117
}
136118

137119
it should "serialize Option" in {
138-
val ser = implicitly[TypeInformation[SimpleOption]].createSerializer(ec)
139-
all(ser, SimpleOption(None))
140-
roundtrip(ser, SimpleOption(Some("foo")))
120+
SimpleOption(None) should haveTypeInfoAndBeSerializable[SimpleOption]
121+
SimpleOption(Some("foo")) should beSerializable[SimpleOption]
141122
}
142123

143124
it should "serialize Either" in {
144-
val ser = implicitly[TypeInformation[SimpleEither]].createSerializer(ec)
145-
all(ser, SimpleEither(Left("foo")))
146-
roundtrip(ser, SimpleEither(Right(42)))
125+
SimpleEither(Left("foo")) should haveTypeInfoAndBeSerializable[SimpleEither]
126+
SimpleEither(Right(42)) should beSerializable[SimpleEither]
147127
}
148128

149129
it should "serialize nested list of ADT" in {
150-
val ser = implicitly[TypeInformation[ListADT]].createSerializer(ec)
151-
all(ser, ListADT(Nil))
152-
roundtrip(ser, ListADT(List(Foo("a"))))
130+
ListADT(Nil) should haveTypeInfoAndBeSerializable[ListADT]
131+
ListADT(List(Foo("a"))) should beSerializable[ListADT]
153132
}
154133

155134
it should "derive multiple instances of generic class" in {
156-
val ser = implicitly[TypeInformation[Generic[SimpleOption]]].createSerializer(ec)
157-
val ser2 = implicitly[TypeInformation[Generic[Simple]]].createSerializer(ec)
158-
all(ser, Generic(SimpleOption(None), Bar(0)))
159-
all(ser2, Generic(Simple(0, "asd"), Bar(0)))
135+
Generic(SimpleOption(None), Bar(0)) should haveTypeInfoAndBeSerializable[Generic[SimpleOption]]
136+
Generic(Simple(0, "asd"), Bar(0)) should haveTypeInfoAndBeSerializable[Generic[Simple]]
160137
}
161138

162139
it should "serialize nil" in {
163-
val ser = implicitly[TypeInformation[NonEmptyList[String]]].createSerializer(ec)
164-
roundtrip(ser, NonEmptyList.one("a"))
140+
NonEmptyList.one("a") should haveTypeInfoAndBeSerializable[NonEmptyList[String]]
165141
}
166142

167143
it should "serialize unit" in {
168-
val ser = implicitly[TypeInformation[Unit]].createSerializer(ec)
169-
roundtrip(ser, ())
144+
() should haveTypeInfoAndBeSerializable[Unit](nullable = false)
170145
}
171146

172147
it should "serialize triple-nested case clases" in {
173-
val ser = implicitly[TypeInformation[Seq[NestedBottom]]].createSerializer(ec)
174-
roundtrip(ser, List(NestedBottom(Some("a"), None)))
148+
List(NestedBottom(Some("a"), None)) should haveTypeInfoAndBeSerializable[Seq[NestedBottom]](nullable = false)
175149
}
176150

177151
it should "serialize classes with type mapper" in {
178152
import MappedTypeInfoTest._
179-
val ser = implicitly[TypeInformation[WrappedString]].createSerializer(ec)
180153
val str = new WrappedString()
181154
str.put("foo")
182-
roundtrip(ser, str)
155+
str should haveTypeInfoAndBeSerializable[WrappedString](nullable = false)
183156
}
184157

185158
it should "serialize bigint" in {
186-
val ser = implicitly[TypeInformation[BigInt]].createSerializer(ec)
187-
roundtrip(ser, BigInt(123))
159+
BigInt(123) should haveTypeInfoAndBeSerializable[BigInt](nullable = false)
188160
}
189161

190162
it should "serialize bigdec" in {
191-
val ser = implicitly[TypeInformation[BigDecimal]].createSerializer(ec)
192-
roundtrip(ser, BigDecimal(123))
163+
BigDecimal(123) should haveTypeInfoAndBeSerializable[BigDecimal](nullable = false)
193164
}
194165

195166
it should "serialize uuid" in {
196-
val ser = implicitly[TypeInformation[UUID]].createSerializer(ec)
197-
roundtrip(ser, UUID.randomUUID())
167+
UUID.randomUUID() should haveTypeInfoAndBeSerializable[UUID](nullable = false)
198168
}
199169

200170
it should "serialize case class with private field" in {
201-
val ser = implicitly[TypeInformation[PrivateField]].createSerializer(ec)
202-
roundtrip(ser, PrivateField("abc"))
171+
PrivateField("abc") should haveTypeInfoAndBeSerializable[PrivateField]
203172
}
204173

205174
it should "serialize a case class overriding a field" in {
206-
val ser = implicitly[TypeInformation[ExtendingCaseClass]].createSerializer(ec)
207-
roundtrip(ser, ExtendingCaseClass("abc", "def"))
175+
ExtendingCaseClass("abc", "def") should haveTypeInfoAndBeSerializable[ExtendingCaseClass]
208176
}
209177

210178
it should "serialize a null case class" in {
211-
val ser = implicitly[TypeInformation[Simple]].createSerializer(ec)
212-
roundtrip(ser, null)
179+
val data: Simple = null
180+
data should haveTypeInfoAndBeSerializable[Simple]
213181
}
214182

215183
it should "serialize a case class with nullable field" in {
216-
val ser = implicitly[TypeInformation[NullableField]].createSerializer(ec)
217-
roundtrip(ser, NullableField(null, Bar(1)))
184+
Bar(1) should haveTypeInfoAndBeSerializable[Bar]
218185
}
219186

220187
it should "serialize a case class with a nullable field of a case class with no arity" in {
221-
val ser = implicitly[TypeInformation[NullableFieldWithNoArity]].createSerializer(ec)
222-
roundtrip(ser, NullableFieldWithNoArity(null))
188+
NullableFieldWithNoArity(null) should haveTypeInfoAndBeSerializable[NullableFieldWithNoArity]
223189
}
224190

225191
it should "serialize nullable fields" in {
226192
val ser = implicitly[TypeInformation[SimpleJava]].createSerializer(ec)
227-
roundtrip(ser, SimpleJava(null, null))
193+
SimpleJava(null, null) should haveTypeInfoAndBeSerializable[SimpleJava]
228194
val ccser = ser.asInstanceOf[CaseClassSerializer[SimpleJava]]
229195
// IntSerializer doesn't handle null so it's wrapped in a NullableSerializer
230196
ccser.getFieldSerializers()(0) shouldBe a[NullableSerializer[Integer]]
231197
ccser.getFieldSerializers()(1) shouldBe a[StringSerializer] // StringSerializer natively handles null
232198
}
233199

234200
it should "serialize a case class with a nullable field of a fixed size case class" in {
235-
val ser = implicitly[TypeInformation[NullableFixedSizeCaseClass]].createSerializer(ec)
236-
roundtrip(ser, NullableFixedSizeCaseClass(null))
201+
NullableFixedSizeCaseClass(null) should haveTypeInfoAndBeSerializable[NullableFixedSizeCaseClass]
237202
}
238203

239204
}

0 commit comments

Comments
 (0)