Skip to content

Conversation

arnaud-daroussin
Copy link
Contributor

@arnaud-daroussin arnaud-daroussin commented Aug 17, 2025

Hi,
Here is a follow-up of previous PRs that were optimizing CaseClassSerializer copy and foreach. I was waiting for the Flink 1 & 2 codebase reunification to submit it:

  • In the first commit, I'm implementing specific direct-memory TypeSerializer#copy(DataInputView, DataOutputView) when more optimized than simple generic de/ser.
  • In the second commit, I'm replacing the for comprehensions with more optimized while loops.
  • In the third commit, add checks in TestUtils:
    • Add checks on serializers: instance and binary copy
    • Add checks on type informations
    • Add Scalatest matchers "haveTypeInfo" and "beSerializable"

I think the checks on type informations need to be discussed. In one hand, they were useful to check our custom type informations and even identified bugs. In the other hand, some checks are tricky to implement correctly, most notably on the arity and the total fields.

The idea is to check the type information against the Class it is supposed to describe using reflection. But the check of total fields requires complex recursion on each fields. Even the definition of "total fields" given in the javadoc is hard to understand.

For now, I had to remove checks on arity and total fields because they are not correct on some case class and sealed traits:

  • CoproductTypeInformation always set an arity of 1 but my check found an arity of 0 because traits don't have fields.
  • TupleTypeInfoBase parent of CaseClassTypeInfo doesn't handle the case stated in the javadoc where "The total number of fields must be at least 1."
  • the check on total fields doesn't handle type params.

Tell me your thought about these checks, do you think it worth to fix all edge cases to make them functional?

@arnaud-daroussin arnaud-daroussin force-pushed the perf-copy-and-while branch 2 times, most recently from ddf5534 to 7718ab5 Compare August 17, 2025 00:22
Add checks on type information
Add Scalatest matchers "haveTypeInfo" and beSerializable
@novakov-alexey
Copy link
Collaborator

Looks interesting. Enjoy your holidays, you deserve them! :-)

I think we need think on bringing some Micro-benchmarks tests to asses all performance improvements on the spot (in CI or at least locally). I have some example to share soon.

@novakov-alexey
Copy link
Collaborator

While I am still checking this PR, I want to share this Apache flink-benchmark with my addition for Scala 3: https://github.com/novakov-alexey/flink-benchmarks?tab=readme-ov-file#diff-with-main-repository

Below are results after running all existing benchmarks and then my addition for Scala 3 in this repository:

image

horizontal bar shows a number of operations per millisecond.

@arnaud-daroussin
Copy link
Contributor Author

arnaud-daroussin commented Aug 25, 2025

While I am still checking this PR, I want to share this Apache flink-benchmark with my addition for Scala 3: https://github.com/novakov-alexey/flink-benchmarks?tab=readme-ov-file#diff-with-main-repository

Below are results after running all existing benchmarks and then my addition for Scala 3 in this repository:

image horizontal bar shows a number of operations per millisecond.

Nice!

As you may have noticed, I’ve been dealing with performance issues for several weeks. I’m in the process of migrating our most critical and most optimized application from Flink’s old Scala API to flink-scala-api, and thus from Kryo to Flinkx serializers.
However, in the most performance-sensitive areas (those with the largest states), we used custom Kryo serializers (which don’t use reflection) that were highly performant. And here, I’m struggling to achieve similar performance with Flinkx serializers.
I’ve identified several bottlenecks that I’ve been trying to fix in Flinkx serializers for several weeks:

  • using while loops instead of foreach.
  • boxing primitive types is very costly; for that there’s no other solution than to use custom serializers for case classes, because line 117 of CaseClassSerializer is doing value.productElement(i) which must do primitive boxing.
  • org.apache.flink.api.common.typeutils.base.StringSerializer seems surprisingly slow compared to serializing strings with Kryo. Even a simple org.apache.flink.core.memory.DataOutputSerializer.writeUTF() appears faster. Perhaps a StringSerializer in Flinkx that simply does out.writeUTF(record) would be faster; I still need to run tests to confirm this.

In any case, perhaps we should add a benchmark to compare the performance of custom Flinkx serializers with custom Kryo custom serializers.

@novakov-alexey
Copy link
Collaborator

I think something "custom" will be always faster than something generic as Fllinx serializers are. Feel free to bring more performance improvements.

In general, I think we should always stay faster than standard Flink Row and Tuple serializers or at least be on the same level. My latest local benchmarks show that we are at this level and this is great to get to this state.

@novakov-alexey
Copy link
Collaborator

Thank you for outstanding effort!

@novakov-alexey novakov-alexey merged commit 096d326 into flink-extended:master Aug 27, 2025
18 checks passed
@arnaud-daroussin arnaud-daroussin deleted the perf-copy-and-while branch August 28, 2025 08:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants