Skip to content

Conversation

jayantdb
Copy link
Contributor

@jayantdb jayantdb commented Sep 25, 2025

What changes were proposed in this pull request?

This PR fixes an issue where avgOffsetsBehindLatest metric of Kafka sources object from streaming progress metrics JSON were displayed in scientific notation (e.g., 2.70941269E8). The fix uses safe Decimal casting to ensure values are displayed in a more human-readable format.

Before change:

{
  "id" : "19815c70-c0f1-4e89-8360-2ef444a35b76",
  "runId" : "6c243067-fff6-47ef-99d8-41db0e278949",
  "name" : "KafkaMetricsTest",
  "timestamp" : "2025-09-23T06:00:00.000Z",
  "batchId" : 1250,
  "batchDuration" : 111255,
  "numInputRows" : 800000,
  "inputRowsPerSecond" : 75291.2831516931
  "processedRowsPerSecond" : 71906.88058963642,
  "durationMs" : {
    "addBatch" : 110481,
    "commitBatch" : 410,
    "commitOffsets" : 107,
    "getBatch" : 0,
    "latestOffset" : 2,
    "queryPlanning" : 179,
    "triggerExecution" : 111255,
    "walCommit" : 74
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[bigdata_omsTrade_cdc]]",
    "startOffset" : {
      "bigdata_omsTrade_cdc" : {
        "0" : 18424809459
      }
    },
    "endOffset" : {
      "bigdata_omsTrade_cdc" : {
        "0" : 18432809459
      }
    },
    "latestOffset" : {
      "bigdata_omsTrade_cdc" : {
        "0" : 18703750728
      }
    },
    "numInputRows" : 800000,
    "inputRowsPerSecond" : 75291.2831516931,
    "processedRowsPerSecond" : 71906.88058963642,
    "metrics" : {
      "avgOffsetsBehindLatest" : "2.70941269E8",
      "maxOffsetsBehindLatest" : "270941269",
      "minOffsetsBehindLatest" : "270941269"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[s3://<masked-storage/__unitystorage/schemas/75bc9f38-9af3-4af9-852b-7077489f93d3/tables/c963dc05-f685-4fe8-a744-500cb40ce28a]",
    "numOutputRows" : -1
  }
}

After change:

{
  "id" : "19815c70-c0f1-4e89-8360-2ef444a35b76",
  "runId" : "6c243067-fff6-47ef-99d8-41db0e278949",
  "name" : "KafkaMetricsTest",
  "timestamp" : "2025-09-23T06:00:00.000Z",
  "batchId" : 1250,
  "batchDuration" : 111255,
  "numInputRows" : 800000,
  "inputRowsPerSecond" : 75291.3,
  "processedRowsPerSecond" : 71906.9,
  "durationMs" : {
    "addBatch" : 110481,
    "commitBatch" : 410,
    "commitOffsets" : 107,
    "getBatch" : 0,
    "latestOffset" : 2,
    "queryPlanning" : 179,
    "triggerExecution" : 111255,
    "walCommit" : 74
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[bigdata_omsTrade_cdc]]",
    "startOffset" : {
      "bigdata_omsTrade_cdc" : {
        "0" : 18424809459
      }
    },
    "endOffset" : {
      "bigdata_omsTrade_cdc" : {
        "0" : 18432809459
      }
    },
    "latestOffset" : {
      "bigdata_omsTrade_cdc" : {
        "0" : 18703750728
      }
    },
    "numInputRows" : 800000,
    "inputRowsPerSecond" : 75291.3,
    "processedRowsPerSecond" : 71906.9,
    "metrics" : {
      "avgOffsetsBehindLatest" : "270941269.0",
      "maxOffsetsBehindLatest" : "270941269",
      "minOffsetsBehindLatest" : "270941269"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[s3://<masked-storage/__unitystorage/schemas/75bc9f38-9af3-4af9-852b-7077489f93d3/tables/c963dc05-f685-4fe8-a744-500cb40ce28a]",
    "numOutputRows" : -1
  }
}

Why are the changes needed?

Current formatting is not user-friendly. A user can easily interpret 2.70941269E8 as 2.7 instead of 270,941,269, as E can be missed to be spotted. This fix will improve the readability of Spark Structured Streaming progress metrics JSON.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Run this Maven test:

./build/mvn -pl sql/core,sql/api \
-am test \
-DwildcardSuites=org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite \
-DwildcardTestName="SPARK-53690"

Results:

Run completed in 8 seconds, 799 milliseconds.
Total number of tests run: 13
Suites: completed 2, aborted 0
Tests: succeeded 13, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for Spark Project Parent POM 4.1.0-SNAPSHOT:
[INFO] 
[INFO] Spark Project Parent POM ........................... SUCCESS [  1.146 s]
[INFO] Spark Project Tags ................................. SUCCESS [  1.421 s]
[INFO] Spark Project Sketch ............................... SUCCESS [  1.409 s]
[INFO] Spark Project Common Java Utils .................... SUCCESS [  1.831 s]
[INFO] Spark Project Common Utils ......................... SUCCESS [  1.697 s]
[INFO] Spark Project Local DB ............................. SUCCESS [  4.101 s]
[INFO] Spark Project Networking ........................... SUCCESS [ 52.442 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [  5.687 s]
[INFO] Spark Project Variant .............................. SUCCESS [  0.782 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [  2.779 s]
[INFO] Spark Project Connect Shims ........................ SUCCESS [  0.712 s]
[INFO] Spark Project Launcher ............................. SUCCESS [  3.582 s]
[INFO] Spark Project Core ................................. SUCCESS [ 26.624 s]
[INFO] Spark Project SQL API .............................. SUCCESS [  2.334 s]
[INFO] Spark Project Catalyst ............................. SUCCESS [  6.520 s]
[INFO] Spark Project SQL .................................. SUCCESS [ 40.679 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  02:34 min
[INFO] Finished at: 2025-10-01T17:57:12+05:30
[INFO] ------------------------------------------------------------------------

Was this patch authored or co-authored using generative AI tooling?

No

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good to me. It'd be ideal if we could come up with test in Kafka data source to verify the change in e2e. If the number should be huge and it's not appropriate for testing, please let me know and we can skip adding e2e test.

("metrics" -> safeMapToJValue[String](
metrics,
(metricsName, s) =>
// SPARK-53690:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically speaking, we shouldn't have a Kafka specific handling in here and instead should build some sort of API to indicate whether we have to remove exponential format on specific metric or not.

But it's a single occurrence and Kafka is a built-in connector, so I guess it is OK. Safer approach is to check whether the source is Kafka, but maybe the metric name is not very common to be conflicted with others.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, i agree. Since its just one metrics from kafka, it should be fine to be part of this.

| "total" : 100
| },
| "stateOperators" : [ ],
| "sources" : [ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a consistent way to trigger scientific notation in Kafka data source test? I understand this is the simplest way to test this, but we'd like to make sure it works with actual Kafka data source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, i gave a try to run a very large input batch from kafka, however, the test case got stuck for 10+ minutes. I am unsure if having a real bulky test case would be optimal for code builds. What are your thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK to skip it. Instead, could you please update the json format string to reflect the real Kafka read? Could you please paste the event json (you can redact some information) you saw the issue?

You can change the number for the test but let's not make this be arbitrary in overall and make it look like a real one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants