Skip to content

Commit 8d5e602

Browse files
LuciferYangdongjoon-hyun
authored andcommitted
[SPARK-53242][CORE][DSTREAM] Move stackTraceToString to JavaUtils and use it to replace Throwables.getStackTraceAsString
### What changes were proposed in this pull request? This pr move `stackTraceToString` to `JavaUtils` and use it to replace `Throwables.getStackTraceAsString`. ### Why are the changes needed? Reuse Spark's existing error handling functions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass Github Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #51969 from LuciferYang/JavaUtils-stackTraceToString. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 73f8a84 commit 8d5e602

File tree

9 files changed

+44
-35
lines changed

9 files changed

+44
-35
lines changed

common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.net.SocketAddress;
2121

22-
import com.google.common.base.Throwables;
2322
import io.netty.channel.Channel;
2423
import io.netty.channel.ChannelFuture;
2524
import io.netty.channel.ChannelFutureListener;
@@ -36,6 +35,7 @@
3635
import org.apache.spark.network.protocol.ChunkFetchRequest;
3736
import org.apache.spark.network.protocol.ChunkFetchSuccess;
3837
import org.apache.spark.network.protocol.Encodable;
38+
import org.apache.spark.network.util.JavaUtils;
3939

4040
import static org.apache.spark.network.util.NettyUtils.*;
4141

@@ -114,7 +114,7 @@ public void processFetchRequest(
114114
MDC.of(LogKeys.STREAM_CHUNK_ID, msg.streamChunkId),
115115
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
116116
respond(channel, new ChunkFetchFailure(msg.streamChunkId,
117-
Throwables.getStackTraceAsString(e)));
117+
JavaUtils.stackTraceToString(e)));
118118
return;
119119
}
120120

common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.net.SocketAddress;
2222
import java.nio.ByteBuffer;
2323

24-
import com.google.common.base.Throwables;
2524
import io.netty.channel.Channel;
2625
import io.netty.channel.ChannelFuture;
2726

@@ -33,6 +32,7 @@
3332
import org.apache.spark.network.buffer.NioManagedBuffer;
3433
import org.apache.spark.network.client.*;
3534
import org.apache.spark.network.protocol.*;
35+
import org.apache.spark.network.util.JavaUtils;
3636
import org.apache.spark.network.util.TransportFrameDecoder;
3737

3838
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
@@ -145,7 +145,7 @@ private void processStreamRequest(final StreamRequest req) {
145145
logger.error("Error opening stream {} for request from {}", e,
146146
MDC.of(LogKeys.STREAM_ID, req.streamId),
147147
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
148-
respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
148+
respond(new StreamFailure(req.streamId, JavaUtils.stackTraceToString(e)));
149149
return;
150150
}
151151

@@ -172,14 +172,14 @@ public void onSuccess(ByteBuffer response) {
172172

173173
@Override
174174
public void onFailure(Throwable e) {
175-
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
175+
respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e)));
176176
}
177177
});
178178
} catch (Exception e) {
179179
logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e,
180180
MDC.of(LogKeys.REQUEST_ID, req.requestId),
181181
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
182-
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
182+
respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e)));
183183
} finally {
184184
req.body().release();
185185
}
@@ -199,7 +199,7 @@ public void onSuccess(ByteBuffer response) {
199199

200200
@Override
201201
public void onFailure(Throwable e) {
202-
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
202+
respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e)));
203203
}
204204
};
205205
TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
@@ -266,7 +266,7 @@ public String getID() {
266266
logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e,
267267
MDC.of(LogKeys.REQUEST_ID, req.requestId),
268268
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
269-
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
269+
respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e)));
270270
}
271271
// We choose to totally fail the channel, rather than trying to recover as we do in other
272272
// cases. We don't know how many bytes of the stream the client has already sent for the
@@ -302,7 +302,7 @@ public void onSuccess(int numChunks, ManagedBuffer buffer) {
302302
@Override
303303
public void onFailure(Throwable e) {
304304
logger.trace("Failed to send meta for {}", req);
305-
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
305+
respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e)));
306306
}
307307
});
308308
} catch (Exception e) {
@@ -311,7 +311,7 @@ public void onFailure(Throwable e) {
311311
MDC.of(LogKeys.SHUFFLE_ID, req.shuffleId),
312312
MDC.of(LogKeys.REDUCE_ID, req.reduceId),
313313
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
314-
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
314+
respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e)));
315315
}
316316
}
317317

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@
2020
import java.io.FileNotFoundException;
2121
import java.net.ConnectException;
2222

23-
import com.google.common.base.Throwables;
24-
2523
import org.apache.spark.annotation.Evolving;
2624
import org.apache.spark.network.server.BlockPushNonFatalFailure;
25+
import org.apache.spark.network.util.JavaUtils;
2726

2827
/**
2928
* Plugs into {@link RetryingBlockTransferor} to further control when an exception should be retried
@@ -105,12 +104,12 @@ class BlockFetchErrorHandler implements ErrorHandler {
105104

106105
@Override
107106
public boolean shouldRetryError(Throwable t) {
108-
return !Throwables.getStackTraceAsString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
107+
return !JavaUtils.stackTraceToString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
109108
}
110109

111110
@Override
112111
public boolean shouldLogError(Throwable t) {
113-
return !Throwables.getStackTraceAsString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
112+
return !JavaUtils.stackTraceToString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
114113
}
115114
}
116115
}

common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,4 +643,17 @@ public static String join(List<Object> arr, String sep) {
643643
}
644644
return joiner.toString();
645645
}
646+
647+
public static String stackTraceToString(Throwable t) {
648+
if (t == null) {
649+
return "";
650+
}
651+
652+
ByteArrayOutputStream out = new ByteArrayOutputStream();
653+
try (PrintWriter writer = new PrintWriter(out)) {
654+
t.printStackTrace(writer);
655+
writer.flush();
656+
}
657+
return out.toString(StandardCharsets.UTF_8);
658+
}
646659
}

common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
*/
1717
package org.apache.spark.util
1818

19-
import java.io.{Closeable, IOException, PrintWriter}
20-
import java.nio.charset.StandardCharsets.UTF_8
19+
import java.io.{Closeable, IOException}
2120

2221
import scala.annotation.tailrec
2322
import scala.collection.mutable
2423
import scala.util.control.NonFatal
2524

2625
import org.apache.spark.internal.{Logging, LogKeys}
26+
import org.apache.spark.network.util.JavaUtils
2727

2828
private[spark] trait SparkErrorUtils extends Logging {
2929
/**
@@ -103,18 +103,7 @@ private[spark] trait SparkErrorUtils extends Logging {
103103
}
104104
}
105105

106-
def stackTraceToString(t: Throwable): String = {
107-
Option(t) match {
108-
case None => ""
109-
case Some(throwable) =>
110-
val out = new java.io.ByteArrayOutputStream
111-
SparkErrorUtils.tryWithResource(new PrintWriter(out)) { writer =>
112-
throwable.printStackTrace(writer)
113-
writer.flush()
114-
}
115-
new String(out.toByteArray, UTF_8)
116-
}
117-
}
106+
def stackTraceToString(t: Throwable): String = JavaUtils.stackTraceToString(t)
118107

119108
/**
120109
* Walks the [[Throwable]] to obtain its root cause.

core/src/test/java/test/org/apache/spark/JavaAPISuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747

4848
import com.google.common.collect.Iterables;
4949
import com.google.common.collect.Iterators;
50-
import com.google.common.base.Throwables;
5150
import org.apache.hadoop.fs.Path;
5251
import org.apache.hadoop.io.IntWritable;
5352
import org.apache.hadoop.io.Text;
@@ -68,6 +67,7 @@
6867
import org.apache.spark.api.java.Optional;
6968
import org.apache.spark.api.java.function.*;
7069
import org.apache.spark.input.PortableDataStream;
70+
import org.apache.spark.network.util.JavaUtils;
7171
import org.apache.spark.partial.BoundedDouble;
7272
import org.apache.spark.partial.PartialResult;
7373
import org.apache.spark.rdd.RDD;
@@ -1503,7 +1503,7 @@ public void testAsyncActionErrorWrapping() throws Exception {
15031503
JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<>()).countAsync();
15041504
ExecutionException ee = assertThrows(ExecutionException.class,
15051505
() -> future.get(2, TimeUnit.SECONDS));
1506-
assertTrue(Throwables.getStackTraceAsString(ee).contains("Custom exception!"));
1506+
assertTrue(JavaUtils.stackTraceToString(ee).contains("Custom exception!"));
15071507
assertTrue(future.isDone());
15081508
}
15091509

dev/checkstyle.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,10 @@
275275
<property name="format" value="ImmutableSet\.of"/>
276276
<property name="message" value="Use Set.of instead." />
277277
</module>
278+
<module name="RegexpSinglelineJava">
279+
<property name="format" value="Throwables\.getStackTraceAsString"/>
280+
<property name="message" value="Use stackTraceToString of JavaUtils/SparkFileUtils/Utils instead." />
281+
</module>
278282
<!-- support structured logging -->
279283
<module name="RegexpSinglelineJava">
280284
<property name="format" value="org\.slf4j\.(Logger|LoggerFactory)" />

scalastyle-config.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ This file is divided into 3 sections:
486486

487487
<check customId="commonslang3getstacktrace" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
488488
<parameters><parameter name="regex">\bExceptionUtils\.getStackTrace\b</parameter></parameters>
489-
<customMessage>Use stackTraceToString of SparkErrorUtils or Utils instead</customMessage>
489+
<customMessage>Use stackTraceToString of JavaUtils/SparkFileUtils/Utils instead.</customMessage>
490490
</check>
491491

492492
<check customId="commonslang3strings" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
@@ -817,6 +817,11 @@ This file is divided into 3 sections:
817817
<customMessage>Use Java APIs (like java.util.Base64) instead.</customMessage>
818818
</check>
819819

820+
<check customId="googleThrowablesGetStackTraceAsString" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
821+
<parameters><parameter name="regex">\bThrowables\.getStackTraceAsString\b</parameter></parameters>
822+
<customMessage>Use stackTraceToString of JavaUtils/SparkFileUtils/Utils instead.</customMessage>
823+
</check>
824+
820825
<check customId="preconditionschecknotnull" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
821826
<parameters><parameter name="regex">\bPreconditions\.checkNotNull\b</parameter></parameters>
822827
<customMessage>Use requireNonNull of java.util.Objects instead.</customMessage>

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong
2424
import scala.collection.mutable.ArrayBuffer
2525
import scala.jdk.CollectionConverters._
2626

27-
import com.google.common.base.Throwables
2827
import org.apache.hadoop.conf.Configuration
2928

3029
import org.apache.spark.{SparkEnv, SparkException}
@@ -35,7 +34,7 @@ import org.apache.spark.storage.StreamBlockId
3534
import org.apache.spark.streaming.Time
3635
import org.apache.spark.streaming.scheduler._
3736
import org.apache.spark.streaming.util.WriteAheadLogUtils
38-
import org.apache.spark.util.RpcUtils
37+
import org.apache.spark.util.{RpcUtils, Utils}
3938

4039
/**
4140
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -168,7 +167,7 @@ private[streaming] class ReceiverSupervisorImpl(
168167

169168
/** Report error to the receiver tracker */
170169
def reportError(message: String, error: Throwable): Unit = {
171-
val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("")
170+
val errorString = Option(error).map(Utils.stackTraceToString).getOrElse("")
172171
trackerEndpoint.send(ReportError(streamId, message, errorString))
173172
logWarning(log"Reported error ${MDC(MESSAGE, message)} - ${MDC(ERROR, error)}")
174173
}
@@ -196,7 +195,7 @@ private[streaming] class ReceiverSupervisorImpl(
196195

197196
override protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = {
198197
logInfo(log"Deregistering receiver ${MDC(LogKeys.STREAM_ID, streamId)}")
199-
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
198+
val errorString = error.map(Utils.stackTraceToString).getOrElse("")
200199
trackerEndpoint.askSync[Boolean](DeregisterReceiver(streamId, message, errorString))
201200
logInfo(log"Stopped receiver ${MDC(LogKeys.STREAM_ID, streamId)}")
202201
}

0 commit comments

Comments
 (0)