Skip to content

Conversation

arnaud-daroussin
Copy link
Contributor

Hi @novakov-alexey,

Here is a small PR to improve the error message when trying to create a type information on a recursive type. We encountered this case and thought the error could be easily detected and fail-fast with a comprehensive message instead of a stack overflow error like this:

java.lang.StackOverflowError
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$$outer(SynchronizedSymbols.scala:206)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$7.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$$outer(SynchronizedSymbols.scala:203)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.validTo(SynchronizedSymbols.scala:149)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.validTo$(SynchronizedSymbols.scala:157)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$7.validTo(SynchronizedSymbols.scala:203)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.$anonfun$typeParams$1(SynchronizedSymbols.scala:180)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.typeParams(SynchronizedSymbols.scala:149)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.typeParams$(SynchronizedSymbols.scala:165)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$7.typeParams(SynchronizedSymbols.scala:203)
	at scala.reflect.internal.Types$NoArgsTypeRef.typeParams(Types.scala:2162)
	at scala.reflect.internal.Definitions$DefinitionsClass.fullyInitializeType(Definitions.scala:245)
	at scala.reflect.internal.Types$Type.toString(Types.scala:937)
	at org.apache.flinkx.api.LowPrioImplicits.typeName(LowPrioImplicits.scala:73)
	at org.apache.flinkx.api.LowPrioImplicits.join(LowPrioImplicits.scala:27)
	at org.apache.flinkx.api.LowPrioImplicits.join$(LowPrioImplicits.scala:24)
	at org.apache.flinkx.api.serializers$.join(serializers.scala:173)
	at org.apache.flinkx.api.RecursiveTypeInfoTest.nodeTypeclass$macro$1$lzycompute$1(RecursiveTypeInfoTest.scala:14)
	at org.apache.flinkx.api.RecursiveTypeInfoTest.nodeTypeclass$macro$1$1(RecursiveTypeInfoTest.scala:14)
	at org.apache.flinkx.api.RecursiveTypeInfoTest.paramTypeclass$macro$3$lzycompute$1(RecursiveTypeInfoTest.scala:14)
	at org.apache.flinkx.api.RecursiveTypeInfoTest.paramTypeclass$macro$3$1(RecursiveTypeInfoTest.scala:14)
	at org.apache.flinkx.api.RecursiveTypeInfoTest.$anonfun$new$2(RecursiveTypeInfoTest.scala:14)
	at magnolia1.CallByNeed.value$lzycompute(magnolia.scala:985)
	at magnolia1.CallByNeed.value(magnolia.scala:981)
	at magnolia1.Param$$anon$7.typeclass(interface.scala:330)
	at org.apache.flinkx.api.LowPrioImplicits.$anonfun$join$1(LowPrioImplicits.scala:38)
	at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:75)
	at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:35)
	at org.apache.flinkx.api.LowPrioImplicits.join(LowPrioImplicits.scala:37)
	at org.apache.flinkx.api.LowPrioImplicits.join$(LowPrioImplicits.scala:24)
	at org.apache.flinkx.api.serializers$.join(serializers.scala:173)
[...]

When I tried to reproduce the stack overflow error here, I saw this commented test in SerializerTest :

  it should "derive recursively" in {
    // recursive is broken
    // val ti = implicitly[TypeInformation[Node]]
  }

I thought recursive type derivation could be fixed. I managed to hack LowPrioImplicits.join() to work with recursive type but CaseClassSerializer doesn't support recursivity at every place it computes things iterating over its fields, at least:

  • TupleSerializerBase.getLength()
  • isImmutableType
    • copy()
  • isImmutableSerializer
    • CaseClassSerializer.duplicate()
      • CaseClassTypeInfo.createSerializer()

It's a dead end, so the fail-fast solution seems the way to go. Here is the resulting message instead of the stack overflow error:

Unsupported: recursivity detected in 'org.apache.flinkx.api.RecursiveTest.Node'.
org.apache.flink.util.FlinkRuntimeException: Unsupported: recursivity detected in 'org.apache.flinkx.api.RecursiveTest.Node'.
	at org.apache.flinkx.api.LowPrioImplicits.join(LowPrioImplicits.scala:31)

Interestingly, Scala 3 detects the recursive type derivation problem at compile time, so this PR applies for Scala 2 only.

@novakov-alexey
Copy link
Collaborator

Hi @arnaud-daroussin ,

Thanks for PR.

I remember that recursive types are unfortunately not supported by this library. Do you see any possibility to support it in future?

@novakov-alexey novakov-alexey merged commit 3ea037e into flink-extended:master Aug 25, 2025
18 checks passed
@arnaud-daroussin arnaud-daroussin deleted the failfast-recursive branch August 26, 2025 07:56
@arnaud-daroussin
Copy link
Contributor Author

arnaud-daroussin commented Aug 26, 2025

Hi @arnaud-daroussin ,

Thanks for PR.

I remember that recursive types are unfortunately not supported by this library. Do you see any possibility to support it in future?

It seems very hard, almost impossible.

It was possible to hack something in LowPrioImplicits thanks to the cache of TI: the idea is to store an incomplete temporary TI without field serializers initialized to break the recursivity, and do the field serializers initialization only once, after: it finds temporary TI in the cache so it doesn't try to initialize another time, effectively breaking the recursivity.

If we wants to handle recursivity in CaseClassSerializer we have to do something similar with a global cache for each recursively processed fields (at least getLength, isImmutableType, isImmutableSerializer, createInstance, equals, hashCode and maybe the most tricky one snapshotConfiguration) and find a coherent short-circuit for each (what is the length of a recursive serializer ? In theory it length is infinite, in practice it can be -1).

Similar problems occur also on CaseClassTypeInfo and its parent TupleTypeInfoBase: getFlatFields, getTypeAt, getTotalFields, equals, hashCode and toString.

So I think it's possible, but is it worth it?

@arnaud-daroussin
Copy link
Contributor Author

And for Scala 3, I have no clue how to fix the compilation error:

flink-scala-api/modules/flink-common-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala:91:87
No given instance of type org.apache.flink.api.common.typeinfo.TypeInformation[
  Option[org.apache.flinkx.api².SerializerTest.Node]] was found.
I found:

    org.apache.flinkx.api².serializers.deriveTypeInformation[
      Option[org.apache.flinkx.api².SerializerTest.Node]](
      {
        final class $anon() extends Object(), Serializable {
          type MirroredMonoType = Option[org.apache.flinkx.api².SerializerTest.Node]
          }
        (new $anon():Object & Serializable)
      }.$asInstanceOf[
        
          scala.deriving.Mirror.Sum{
            type MirroredMonoType² =
              Option[org.apache.flinkx.api².SerializerTest.Node];
              type MirroredType = Option[org.apache.flinkx.api².SerializerTest.Node]
                ;
            type MirroredLabel = ("Option" : String);
              type MirroredElemTypes = (None.type,
                Some[org.apache.flinkx.api².SerializerTest.Node]);
              type MirroredElemLabels = (("None$" : String), ("Some" : String))
          }
        
      ],
      scala.reflect.ClassTag.apply[
        Option[org.apache.flinkx.api².SerializerTest.Node]](classOf[Option]),
      {
        final class $anon²() extends Object(), org.apache.flinkx.api².TypeTag[
          Option[org.apache.flinkx.api².SerializerTest.Node]] {
          override lazy val isModule: Boolean = false
          override lazy val isCachable: Boolean = true
          override lazy val toString: String =
            "scala.Option[org.apache.flinkx.api.SerializerTest.Node]"
        }
        new $anon²():
          
            org.apache.flinkx.api².TypeTag[
              Option[org.apache.flinkx.api².SerializerTest.Node]]
          
      }:
        org.apache.flinkx.api².TypeTag[
          Option[org.apache.flinkx.api².SerializerTest.Node]]
    )

But method deriveTypeInformation in trait LowPrioImplicits does not match type org.apache.flink.api.common.typeinfo.TypeInformation[
  Option[org.apache.flinkx.api².SerializerTest.Node]]

where:    $anon             is a anonymous class in the initializer of value typeclass
          $anon²            is a anonymous class in the initializer of value typeclass
          MirroredMonoType  is a type in the empty package which is an alias of Option[org.apache.flinkx.api².SerializerTest.Node]
          MirroredMonoType² is a type in trait Mirror
          api               is a package in package org.apache.flink
          api²              is a package in package org.apache.flinkx
.
    val exception = intercept[FlinkRuntimeException](implicitly[TypeInformation[Node]])

@novakov-alexey
Copy link
Collaborator

Hi @arnaud-daroussin ,
Thanks for PR.
I remember that recursive types are unfortunately not supported by this library. Do you see any possibility to support it in future?

It seems very hard, almost impossible.

It was possible to hack something in LowPrioImplicits thanks to the cache of TI: the idea is to store an incomplete temporary TI without field serializers initialized to break the recursivity, and do the field serializers initialization only once, after: it finds temporary TI in the cache so it doesn't try to initialize another time, effectively breaking the recursivity.

If we wants to handle recursivity in CaseClassSerializer we have to do something similar with a global cache for each recursively processed fields (at least getLength, isImmutableType, isImmutableSerializer, createInstance, equals, hashCode and maybe the most tricky one snapshotConfiguration) and find a coherent short-circuit for each (what is the length of a recursive serializer ? In theory it length is infinite, in practice it can be -1).

Similar problems occur also on CaseClassTypeInfo and its parent TupleTypeInfoBase: getFlatFields, getTypeAt, getTotalFields, equals, hashCode and toString.

So I think it's possible, but is it worth it?

What you described sounds like memorization and dynamic programming. https://stackoverflow.com/a/6185005/6176274

I met people who wanted this library to support recursive types. I think they wanted to parse JSON data which can have recursive nested documents inside.

Yes, it would be nice to solve this problem in another issue. Perhaps you could register new issue and approximately describe its complexity and how it could be solved at least theoretically?

@novakov-alexey
Copy link
Collaborator

And for Scala 3, I have no clue how to fix the compilation error:

I guess it would be a question at Scala Users Forum https://users.scala-lang.org/, where Scala Center folks could help to understand Scala 3 Mirror API. It is something really advanced.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants