Skip to content

[SPARK-53242][CORE][DSTREAM] Move stackTraceToString to JavaUtils and use it to replace Throwables.getStackTraceAsString #51969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.net.SocketAddress;

import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
Expand All @@ -36,6 +35,7 @@
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.ChunkFetchSuccess;
import org.apache.spark.network.protocol.Encodable;
import org.apache.spark.network.util.JavaUtils;

import static org.apache.spark.network.util.NettyUtils.*;

Expand Down Expand Up @@ -114,7 +114,7 @@ public void processFetchRequest(
MDC.of(LogKeys.STREAM_CHUNK_ID, msg.streamChunkId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
respond(channel, new ChunkFetchFailure(msg.streamChunkId,
Throwables.getStackTraceAsString(e)));
JavaUtils.stackTraceToString(e)));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.net.SocketAddress;
import java.nio.ByteBuffer;

import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;

Expand All @@ -33,6 +32,7 @@
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.*;
import org.apache.spark.network.protocol.*;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportFrameDecoder;

import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
Expand Down Expand Up @@ -145,7 +145,7 @@ private void processStreamRequest(final StreamRequest req) {
logger.error("Error opening stream {} for request from {}", e,
MDC.of(LogKeys.STREAM_ID, req.streamId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
respond(new StreamFailure(req.streamId, JavaUtils.stackTraceToString(e)));
return;
}

Expand All @@ -172,14 +172,14 @@ public void onSuccess(ByteBuffer response) {

@Override
public void onFailure(Throwable e) {
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e)));
}
});
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e,
MDC.of(LogKeys.REQUEST_ID, req.requestId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e)));
} finally {
req.body().release();
}
Expand All @@ -199,7 +199,7 @@ public void onSuccess(ByteBuffer response) {

@Override
public void onFailure(Throwable e) {
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e)));
}
};
TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
Expand Down Expand Up @@ -266,7 +266,7 @@ public String getID() {
logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e,
MDC.of(LogKeys.REQUEST_ID, req.requestId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e)));
}
// We choose to totally fail the channel, rather than trying to recover as we do in other
// cases. We don't know how many bytes of the stream the client has already sent for the
Expand Down Expand Up @@ -302,7 +302,7 @@ public void onSuccess(int numChunks, ManagedBuffer buffer) {
@Override
public void onFailure(Throwable e) {
logger.trace("Failed to send meta for {}", req);
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e)));
}
});
} catch (Exception e) {
Expand All @@ -311,7 +311,7 @@ public void onFailure(Throwable e) {
MDC.of(LogKeys.SHUFFLE_ID, req.shuffleId),
MDC.of(LogKeys.REDUCE_ID, req.reduceId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
respond(new RpcFailure(req.requestId, JavaUtils.stackTraceToString(e)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
import java.io.FileNotFoundException;
import java.net.ConnectException;

import com.google.common.base.Throwables;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.network.server.BlockPushNonFatalFailure;
import org.apache.spark.network.util.JavaUtils;

/**
* Plugs into {@link RetryingBlockTransferor} to further control when an exception should be retried
Expand Down Expand Up @@ -105,12 +104,12 @@ class BlockFetchErrorHandler implements ErrorHandler {

@Override
public boolean shouldRetryError(Throwable t) {
return !Throwables.getStackTraceAsString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
return !JavaUtils.stackTraceToString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
}

@Override
public boolean shouldLogError(Throwable t) {
return !Throwables.getStackTraceAsString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
return !JavaUtils.stackTraceToString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -643,4 +643,17 @@ public static String join(List<Object> arr, String sep) {
}
return joiner.toString();
}

public static String stackTraceToString(Throwable t) {
if (t == null) {
return "";
}

ByteArrayOutputStream out = new ByteArrayOutputStream();
try (PrintWriter writer = new PrintWriter(out)) {
t.printStackTrace(writer);
writer.flush();
}
return out.toString(StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
*/
package org.apache.spark.util

import java.io.{Closeable, IOException, PrintWriter}
import java.nio.charset.StandardCharsets.UTF_8
import java.io.{Closeable, IOException}

import scala.annotation.tailrec
import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.internal.{Logging, LogKeys}
import org.apache.spark.network.util.JavaUtils

private[spark] trait SparkErrorUtils extends Logging {
/**
Expand Down Expand Up @@ -103,18 +103,7 @@ private[spark] trait SparkErrorUtils extends Logging {
}
}

def stackTraceToString(t: Throwable): String = {
Option(t) match {
case None => ""
case Some(throwable) =>
val out = new java.io.ByteArrayOutputStream
SparkErrorUtils.tryWithResource(new PrintWriter(out)) { writer =>
throwable.printStackTrace(writer)
writer.flush()
}
new String(out.toByteArray, UTF_8)
}
}
def stackTraceToString(t: Throwable): String = JavaUtils.stackTraceToString(t)

/**
* Walks the [[Throwable]] to obtain its root cause.
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/test/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@

import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.base.Throwables;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
Expand All @@ -68,6 +67,7 @@
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.input.PortableDataStream;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.partial.BoundedDouble;
import org.apache.spark.partial.PartialResult;
import org.apache.spark.rdd.RDD;
Expand Down Expand Up @@ -1503,7 +1503,7 @@ public void testAsyncActionErrorWrapping() throws Exception {
JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<>()).countAsync();
ExecutionException ee = assertThrows(ExecutionException.class,
() -> future.get(2, TimeUnit.SECONDS));
assertTrue(Throwables.getStackTraceAsString(ee).contains("Custom exception!"));
assertTrue(JavaUtils.stackTraceToString(ee).contains("Custom exception!"));
assertTrue(future.isDone());
}

Expand Down
4 changes: 4 additions & 0 deletions dev/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@
<property name="format" value="ImmutableSet\.of"/>
<property name="message" value="Use Set.of instead." />
</module>
<module name="RegexpSinglelineJava">
<property name="format" value="Throwables\.getStackTraceAsString"/>
<property name="message" value="Use stackTraceToString of JavaUtils/SparkFileUtils/Utils instead." />
</module>
<!-- support structured logging -->
<module name="RegexpSinglelineJava">
<property name="format" value="org\.slf4j\.(Logger|LoggerFactory)" />
Expand Down
7 changes: 6 additions & 1 deletion scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ This file is divided into 3 sections:

<check customId="commonslang3getstacktrace" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">\bExceptionUtils\.getStackTrace\b</parameter></parameters>
<customMessage>Use stackTraceToString of SparkErrorUtils or Utils instead</customMessage>
<customMessage>Use stackTraceToString of JavaUtils/SparkFileUtils/Utils instead.</customMessage>
</check>

<check customId="commonslang3strings" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
Expand Down Expand Up @@ -817,6 +817,11 @@ This file is divided into 3 sections:
<customMessage>Use Java APIs (like java.util.Base64) instead.</customMessage>
</check>

<check customId="googleThrowablesGetStackTraceAsString" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">\bThrowables\.getStackTraceAsString\b</parameter></parameters>
<customMessage>Use stackTraceToString of JavaUtils/SparkFileUtils/Utils instead.</customMessage>
</check>

<check customId="preconditionschecknotnull" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">\bPreconditions\.checkNotNull\b</parameter></parameters>
<customMessage>Use requireNonNull of java.util.Objects instead.</customMessage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

import com.google.common.base.Throwables
import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkEnv, SparkException}
Expand All @@ -35,7 +34,7 @@ import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.util.RpcUtils
import org.apache.spark.util.{RpcUtils, Utils}

/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
Expand Down Expand Up @@ -168,7 +167,7 @@ private[streaming] class ReceiverSupervisorImpl(

/** Report error to the receiver tracker */
def reportError(message: String, error: Throwable): Unit = {
val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("")
val errorString = Option(error).map(Utils.stackTraceToString).getOrElse("")
trackerEndpoint.send(ReportError(streamId, message, errorString))
logWarning(log"Reported error ${MDC(MESSAGE, message)} - ${MDC(ERROR, error)}")
}
Expand Down Expand Up @@ -196,7 +195,7 @@ private[streaming] class ReceiverSupervisorImpl(

override protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = {
logInfo(log"Deregistering receiver ${MDC(LogKeys.STREAM_ID, streamId)}")
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
val errorString = error.map(Utils.stackTraceToString).getOrElse("")
trackerEndpoint.askSync[Boolean](DeregisterReceiver(streamId, message, errorString))
logInfo(log"Stopped receiver ${MDC(LogKeys.STREAM_ID, streamId)}")
}
Expand Down