From 6cb4d2b31b74f96a9987be926239b427d8bb475d Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 11 Aug 2025 14:09:44 +0800 Subject: [PATCH 1/3] init --- .../server/ChunkFetchRequestHandler.java | 4 ++-- .../network/server/TransportRequestHandler.java | 16 ++++++++-------- .../spark/network/shuffle/ErrorHandler.java | 7 +++---- .../apache/spark/network/util/JavaUtils.java | 13 +++++++++++++ .../org/apache/spark/util/SparkErrorUtils.scala | 17 +++-------------- .../test/org/apache/spark/JavaAPISuite.java | 4 ++-- dev/checkstyle.xml | 4 ++++ scalastyle-config.xml | 7 ++++++- .../receiver/ReceiverSupervisorImpl.scala | 7 +++---- 9 files changed, 44 insertions(+), 35 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java index 11641ddacd58b..c7d4d671dec7d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -19,7 +19,6 @@ import java.net.SocketAddress; -import com.google.common.base.Throwables; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -36,6 +35,7 @@ import org.apache.spark.network.protocol.ChunkFetchRequest; import org.apache.spark.network.protocol.ChunkFetchSuccess; import org.apache.spark.network.protocol.Encodable; +import org.apache.spark.network.util.JavaUtils; import static org.apache.spark.network.util.NettyUtils.*; @@ -114,7 +114,7 @@ public void processFetchRequest( MDC.of(LogKeys.STREAM_CHUNK_ID, msg.streamChunkId), MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel))); respond(channel, new ChunkFetchFailure(msg.streamChunkId, - Throwables.getStackTraceAsString(e))); + JavaUtils.stackTraceToString(e))); return; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 37174a66c6d6d..464d4d9eb378f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -21,7 +21,6 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; -import com.google.common.base.Throwables; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -33,6 +32,7 @@ import org.apache.spark.network.buffer.NioManagedBuffer; import org.apache.spark.network.client.*; import org.apache.spark.network.protocol.*; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.TransportFrameDecoder; import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; @@ -145,7 +145,7 @@ private void processStreamRequest(final StreamRequest req) { logger.error("Error opening stream {} for request from {}", e, MDC.of(LogKeys.STREAM_ID, req.streamId), MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel))); - respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e))); + respond(new StreamFailure(req.streamId, JavaUtils.stackTraceToString(e))); return; } @@ -172,14 +172,14 @@ public void onSuccess(ByteBuffer response) { @Override public void onFailure(Throwable e) { - respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); + respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e))); } }); } catch (Exception e) { logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e, MDC.of(LogKeys.REQUEST_ID, req.requestId), MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel))); - respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); + respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e))); } finally { req.body().release(); } @@ -199,7 +199,7 @@ public void onSuccess(ByteBuffer response) { @Override public void onFailure(Throwable e) { - respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); + respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e))); } }; TransportFrameDecoder frameDecoder = (TransportFrameDecoder) @@ -266,7 +266,7 @@ public String getID() { logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e, MDC.of(LogKeys.REQUEST_ID, req.requestId), MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel))); - respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); + respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e))); } // We choose to totally fail the channel, rather than trying to recover as we do in other // cases. We don't know how many bytes of the stream the client has already sent for the @@ -302,7 +302,7 @@ public void onSuccess(int numChunks, ManagedBuffer buffer) { @Override public void onFailure(Throwable e) { logger.trace("Failed to send meta for {}", req); - respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); + respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e))); } }); } catch (Exception e) { @@ -311,7 +311,7 @@ public void onFailure(Throwable e) { MDC.of(LogKeys.SHUFFLE_ID, req.shuffleId), MDC.of(LogKeys.REDUCE_ID, req.reduceId), MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel))); - respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); + respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e))); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java index 31ed10ad76f8f..298611cc8567f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java @@ -20,10 +20,9 @@ import java.io.FileNotFoundException; import java.net.ConnectException; -import com.google.common.base.Throwables; - import org.apache.spark.annotation.Evolving; import org.apache.spark.network.server.BlockPushNonFatalFailure; +import org.apache.spark.network.util.JavaUtils; /** * Plugs into {@link RetryingBlockTransferor} to further control when an exception should be retried @@ -105,12 +104,12 @@ class BlockFetchErrorHandler implements ErrorHandler { @Override public boolean shouldRetryError(Throwable t) { - return !Throwables.getStackTraceAsString(t).contains(STALE_SHUFFLE_BLOCK_FETCH); + return !JavaUtils.stackTraceToString(t).contains(STALE_SHUFFLE_BLOCK_FETCH); } @Override public boolean shouldLogError(Throwable t) { - return !Throwables.getStackTraceAsString(t).contains(STALE_SHUFFLE_BLOCK_FETCH); + return !JavaUtils.stackTraceToString(t).contains(STALE_SHUFFLE_BLOCK_FETCH); } } } diff --git a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java index 3a2485520c664..27c9cfc215e5e 100644 --- a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -643,4 +643,17 @@ public static String join(List arr, String sep) { } return joiner.toString(); } + + public static String stackTraceToString(Throwable t) { + if (t == null) { + return ""; + } + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (PrintWriter writer = new PrintWriter(out)) { + t.printStackTrace(writer); + writer.flush(); + } + return out.toString(StandardCharsets.UTF_8); + } } diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala index c4a2856e5fa63..2f16c90ad7149 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala @@ -16,14 +16,14 @@ */ package org.apache.spark.util -import java.io.{Closeable, IOException, PrintWriter} -import java.nio.charset.StandardCharsets.UTF_8 +import java.io.{Closeable, IOException} import scala.annotation.tailrec import scala.collection.mutable import scala.util.control.NonFatal import org.apache.spark.internal.{Logging, LogKeys} +import org.apache.spark.network.util.JavaUtils private[spark] trait SparkErrorUtils extends Logging { /** @@ -103,18 +103,7 @@ private[spark] trait SparkErrorUtils extends Logging { } } - def stackTraceToString(t: Throwable): String = { - Option(t) match { - case None => "" - case Some(throwable) => - val out = new java.io.ByteArrayOutputStream - SparkErrorUtils.tryWithResource(new PrintWriter(out)) { writer => - throwable.printStackTrace(writer) - writer.flush() - } - new String(out.toByteArray, UTF_8) - } - } + def stackTraceToString(t: Throwable): String = JavaUtils.stackTraceToString(t) /** * Walks the [[Throwable]] to obtain its root cause. diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java index ac0d26edd1937..8d2c52cd9ed8d 100644 --- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -40,6 +40,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; +import org.apache.spark.network.util.JavaUtils; import scala.Tuple2; import scala.Tuple3; import scala.Tuple4; @@ -47,7 +48,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; -import com.google.common.base.Throwables; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -1503,7 +1503,7 @@ public void testAsyncActionErrorWrapping() throws Exception { JavaFutureAction future = rdd.map(new BuggyMapFunction<>()).countAsync(); ExecutionException ee = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS)); - assertTrue(Throwables.getStackTraceAsString(ee).contains("Custom exception!")); + assertTrue(JavaUtils.stackTraceToString(ee).contains("Custom exception!")); assertTrue(future.isDone()); } diff --git a/dev/checkstyle.xml b/dev/checkstyle.xml index 1f877027f5b7b..d329d30037a6c 100644 --- a/dev/checkstyle.xml +++ b/dev/checkstyle.xml @@ -273,6 +273,10 @@ + + + + diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 4c0439b73c731..91ce296c313b6 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -486,7 +486,7 @@ This file is divided into 3 sections: \bExceptionUtils\.getStackTrace\b - Use stackTraceToString of SparkErrorUtils or Utils instead + Use stackTraceToString of JavaUtils/SparkFileUtils/Utils instead. @@ -817,6 +817,11 @@ This file is divided into 3 sections: Use Java APIs (like java.util.Base64) instead. + + \bThrowables\.getStackTraceAsString\b + Use stackTraceToString of JavaUtils/SparkFileUtils/Utils instead. + + \bPreconditions\.checkNotNull\b Use requireNonNull of java.util.Objects instead. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 6a997ede2b7e3..2f0e281040e92 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ -import com.google.common.base.Throwables import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkEnv, SparkException} @@ -35,7 +34,7 @@ import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.Time import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.util.WriteAheadLogUtils -import org.apache.spark.util.RpcUtils +import org.apache.spark.util.{RpcUtils, Utils} /** * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]] @@ -168,7 +167,7 @@ private[streaming] class ReceiverSupervisorImpl( /** Report error to the receiver tracker */ def reportError(message: String, error: Throwable): Unit = { - val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("") + val errorString = Option(error).map(Utils.stackTraceToString).getOrElse("") trackerEndpoint.send(ReportError(streamId, message, errorString)) logWarning(log"Reported error ${MDC(MESSAGE, message)} - ${MDC(ERROR, error)}") } @@ -196,7 +195,7 @@ private[streaming] class ReceiverSupervisorImpl( override protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = { logInfo(log"Deregistering receiver ${MDC(LogKeys.STREAM_ID, streamId)}") - val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("") + val errorString = error.map(Utils.stackTraceToString).getOrElse("") trackerEndpoint.askSync[Boolean](DeregisterReceiver(streamId, message, errorString)) logInfo(log"Stopped receiver ${MDC(LogKeys.STREAM_ID, streamId)}") } From c612b044ca3fb8dd35484255a0eb821dd6f15769 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 11 Aug 2025 14:15:19 +0800 Subject: [PATCH 2/3] import --- core/src/test/java/test/org/apache/spark/JavaAPISuite.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java index 8d2c52cd9ed8d..19f1c7598c4c6 100644 --- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -40,7 +40,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; -import org.apache.spark.network.util.JavaUtils; + import scala.Tuple2; import scala.Tuple3; import scala.Tuple4; @@ -68,6 +68,7 @@ import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.*; import org.apache.spark.input.PortableDataStream; +import org.apache.spark.network.util.JavaUtils; import org.apache.spark.partial.BoundedDouble; import org.apache.spark.partial.PartialResult; import org.apache.spark.rdd.RDD; From 8d231984c6c5184e33b1cffda2ac64e7f37574cd Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 11 Aug 2025 14:16:26 +0800 Subject: [PATCH 3/3] import --- core/src/test/java/test/org/apache/spark/JavaAPISuite.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java index 19f1c7598c4c6..f2f4101877e51 100644 --- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -40,7 +40,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; - import scala.Tuple2; import scala.Tuple3; import scala.Tuple4;