Skip to content

[BUG]: Applying UDFs to TimestampType causes occasional exception #760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
akamor opened this issue Oct 29, 2020 · 26 comments · Fixed by #868
Closed

[BUG]: Applying UDFs to TimestampType causes occasional exception #760

akamor opened this issue Oct 29, 2020 · 26 comments · Fixed by #868
Assignees
Labels
bug Something isn't working
Milestone

Comments

@akamor
Copy link

akamor commented Oct 29, 2020

Describe the bug
When applying multiple UDFs to a dataframe an error is thrown when at least one of the target columns of the UDF is a TimestampType column.

To Reproduce

Below is a minimal reproduction you can use. It includes a comment on how to create the necessary parquet file. It must have 2 timestamp columns and a sufficient number of rows (~2k) for the issue to occur. It is launched via the below command:

spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2 --master local .\microsoft-spark-3-0_2.12-1.0.0.jar dotnet .\spark_cache_bug.dll
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;

using static Microsoft.Spark.Sql.Functions;

namespace spark_cache_bug
{
    public static class SparkGenerator
    {
        public static Func<Column, Column> Generate(StructType returnType, string column) =>
            Udf<Row>(row => new GenericRow(new string [] {"1970-01-02 00:00:00"}), returnType);
    }

    public static class Program
    {
        private static void Generate(Dictionary<string, Column> expressionMap, string column, DataFrame df)
        {
            var typeMapper = new Dictionary<string, DataType>();
            foreach (var f in df.Schema().Fields)
            {
                typeMapper.Add(f.Name, f.DataType);
            }

            var returnType = new StructType(new[] {new StructField(column, new StringType())});

            var udf = SparkGenerator.Generate(returnType,column);

            var newCol = udf(Struct(expressionMap[column]));
            expressionMap[column] = newCol.GetField(column).Cast(typeMapper[column].SimpleString).Alias(column);

        }

        /*
         * To create the parquet file for testing.
         * spark-shell --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2   (assuming you are writing file to S3)
         * import java.sql.Timestamp
         * val df = sc.parallelize( Seq.fill(10000){(new Timestamp(0), new Timestamp(0))}).toDF("timestamp1","timestamp2")
         * df.write.parquet("...")
         *
         * this creates a parquet file with 2 columns called timestamp1 and timestamp2.  They are both of type TimestampType
         */

        private static void Main(string[] args)
        {

            var spark = SparkSession.Builder().GetOrCreate();
            spark.Conf().Set("fs.s3a.access.key", "<AWS ACCESS ID>");
            spark.Conf().Set("fs.s3a.secret.key", "<AWS ACCESS SECRET KEY");
            var sourcePath = "s3a://<PATH TO FILE>";
            var outputPath = "s3a://<DESIRED OUTPUT LOCATION>";

            var df = spark.Read().Parquet(sourcePath);

            var expressionMap = df.Schema().Fields.ToDictionary(f => f.Name, f => df.Col(f.Name));

            Generate(expressionMap, "timestamp1", df);
            Generate(expressionMap, "timestamp2", df);

            df = df.Select(df.Schema().Fields.Select(f => expressionMap[f.Name]).ToArray());
            df.Write().Mode("overwrite").Parquet(outputPath);
        }
    }
}

The exception thrown is:

Caused by: org.apache.spark.api.python.PythonException: System.InvalidCastException: Unable to cast object of type 'Microsoft.Spark.Sql.Types.Timestamp' to type 'System.Int32'.
   at Microsoft.Spark.Sql.Types.TimestampType.FromInternal(Object obj) in /_/src/csharp/Microsoft.Spark/Sql/Types/SimpleTypes.cs:line 116
   at Microsoft.Spark.Sql.Row.Convert() in /_/src/csharp/Microsoft.Spark/Sql/Row.cs:line 152
   at Microsoft.Spark.Sql.Row..ctor(Object[] values, StructType schema) in /_/src/csharp/Microsoft.Spark/Sql/Row.cs:line 36
   at Microsoft.Spark.Sql.RowConstructor.GetRow() in /_/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs:line 104
   at Microsoft.Spark.Sql.RowConstructor.GetRow() in /_/src/csharp/Microsoft.Spark/Sql/RowConstructor.cs:line 100
   at Microsoft.Spark.Worker.Command.PicklingSqlCommandExecutor.ExecuteCore(Stream inputStream, Stream outputStream, SqlCommand[] commands) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 146
   at Microsoft.Spark.Worker.Command.SqlCommandExecutor.Execute(Version version, Stream inputStream, Stream outputStream, PythonEvalType evalType, SqlCommand[] commands) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\SqlCommandExecutor.cs:line 76
   at Microsoft.Spark.Worker.Command.CommandExecutor.Execute(Stream inputStream, Stream outputStream, Int32 splitIndex, CommandPayload commandPayload) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Command\CommandExecutor.cs:line 65
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 154
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
        at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
        at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
        ... 9 more

Our investigation
After investigating a bit, it appears to be because a given row's TimestampType column is sometimes passed to TimestampType.FromInternal multiple times (once per UDF call). The first time FromInternal is called it properly creates a new Timestamp object. The second time, however, that same Timestamp object is passed again into the function where the function is expecting an int or long. The culprit might be RowConstructor.GetRow which appears to possible re-use _args. When I modify GetRows to first make a copy of _args and pass the copy to the Row constructor the issue goes away.

Desktop (please complete the following information):

  • Windows 10
  • Chrome
  • v1.0.0 running Spark 3

Additional context
This error won't always occur but occurs with high probability if the dataframe has at least 2k rows.

@akamor akamor added the bug Something isn't working label Oct 29, 2020
@akamor
Copy link
Author

akamor commented Oct 29, 2020

Here is the parquet file I used for my test to save you the trouble of having to make one.

data.zip

@imback82
Copy link
Contributor

Thanks @akamor for reporting this issue! @elvaliuliuliu Can you help with this?

@elvaliuliuliu
Copy link
Contributor

I will take a look!

@elvaliuliuliu
Copy link
Contributor

@akamor Thanks for providing the repro as well as the parquet data. I have run this test multiple times (>10) and they all succeeded. How frequent are you getting this error?

@akamor
Copy link
Author

akamor commented Oct 29, 2020

@elvaliuliuliu
This repros for me every time without fail. To double check I just wiped away my initial setup and followed the above repo instructions to verify that what I posted is indeed reproducible. I've included my exact steps below:

In powershell (not WSL):

mkdir spark_cache_bug
cd spark_cache_bug
dotnet new console
dotnet add package Microsoft.spark

Now open open Program.cs in your editor of choice and copy the contents of my Program.cs from above.

You'll need to provide your own credentials for AWS on lines 50 and 51 and you'll need to extract the zip file and upload the parquet to a s3 folder. You'll have to reference that folder on line 52 and then reference a desired output folder on line 53.

I am not sure if this will reproduce if not using s3a. If you don't have an AWS setup I can likely DM you the necessary credentials to use.

dotnet build 
cd bin/Debug/netcoreapp3.1
spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2 --master local .\microsoft-spark-3-0_2.12-1.0.0.jar dotnet .\spark_cache_bug.dll

@suhsteve
Copy link
Member

Does this only repro if you use s3? What if you read/write to a local storage ?

@elvaliuliuliu
Copy link
Contributor

elvaliuliuliu commented Oct 29, 2020

I am running the app in my local environment which does not repro this issue. @akamor that would be helpful if you can DM me with AWS setup.

cc: @MikeRys

@akamor
Copy link
Author

akamor commented Oct 29, 2020

@suhsteve I've only ever repo'd this on S3. Based on our investigation I think there is likely some race condition so it is believable that it might not occur using local storage, imo.

can y'all try on s3? I can also send over some creds if you need assuming it can be done in a secure way.

@akamor
Copy link
Author

akamor commented Oct 29, 2020

@elvaliuliuliu Sorry, just saw you already asked for the AWS setup. I'll get that to you in a few minutes!

@akamor
Copy link
Author

akamor commented Oct 29, 2020

@elvaliuliuliu Sorry, I don't actually know how to DM you. How do I do that? I'm ready to send the creds when I figure that out.

@elvaliuliuliu
Copy link
Contributor

@elvaliuliuliu Sorry, I don't actually know how to DM you. How do I do that? I'm ready to send the creds when I figure that out.

If you would like to provide a contact info, I can reach out.

@akamor
Copy link
Author

akamor commented Oct 29, 2020

Sure. Can you reach out to hello@tonic.ai. I'll see the e-mail and respond.

@akamor
Copy link
Author

akamor commented Oct 29, 2020

@elvaliuliuliu Details sent!

@elvaliuliuliu
Copy link
Contributor

@elvaliuliuliu Details sent!

Sorry I don’t seem to receive anything yet.

@akamor
Copy link
Author

akamor commented Oct 29, 2020

Just sent again. Please check spam folder, I guess?

@elvaliuliuliu
Copy link
Contributor

Just sent again. Please check spam folder, I guess?

Yep, got it. Thanks!

@elvaliuliuliu
Copy link
Contributor

@akamor I was able to repro this issue with Spark 3.0. Thanks for providing all the details! However, spark 2.4 works in this case. Maybe you can try to unblock yourself with Spark2.4 for now and I will take a closer look at this with Spark 3.0. Thanks.

@akamor
Copy link
Author

akamor commented Oct 30, 2020

Moving back to Spark 2.4 is not an option for us, unfortunately. We have a non-ideal fix at the moment which we've implemented in RowConstructor.GetRow.

Instead of passing _args into the Row constructor we instead copy _args first into a new object[] then pass it along. I'm sure we are taking a performance hit there, but it does fix things. It is also why we think this is somehow a cache-ing or race issue but we never really investigated too far.

Another option we discussed briefly was just adding some type checking in the TimeStampType and DateType FromInternal methods to simply early return if the val passed in was already the correct time (i.e. not a long or int). Of course, this is a band-aid and does not fix the underlying issue, whatever it may be.

Do any of y'all have thoughts on the above?

@elvaliuliuliu
Copy link
Contributor

I agree with @akamor , the above two methods are not permanent fixes. I will investigate more on the underlying issue and get back on this.

@imback82
Copy link
Contributor

imback82 commented Nov 3, 2020

Any update on this? We need to decide whether this is a blocker for 1.0.1 release or not.

@akamor
Copy link
Author

akamor commented Nov 3, 2020

@imback82 It is certainly causing issues for us in multiple production deployments. We'd be very appreciative if this could get fixed in 1.0.1. We can also assist with testing if that helps.

@elvaliuliuliu
Copy link
Contributor

Spark 3.0 works fine in local environment but not with remote storage s3 in this case. I am actually not sure what caused this issue with remote storage for Spark 3.0.

@akamor I have a potential fix (PR #765) which should unblock you. Please kindly try it and let me know if this doesn't address the issue.

Any other ideas are very welcome.

@akamor
Copy link
Author

akamor commented Nov 4, 2020

Thanks @elvaliuliuliu I'll give it a whirl in a moment. We actually did a similar fix on our side but decided to go more heavy-handed in case there were other hidden issues.

It seems like the real issue here is re-use of the arrays storing values before being passed to the Row constructor (line 104 of spark/src/csharp/Microsoft.Spark/sql/RowConstructor.cs. Is that your take as well? If so, is it possible this issue could creep up again in another subtle way?

@akamor
Copy link
Author

akamor commented Nov 4, 2020

I can confirm the PR fixes the issue in our test cases. Thank you very much for such a rapid turnaround.

With that being said, I would be curious to hear your thoughts on my above point.

@imback82 imback82 added this to the 1.0.1 milestone Nov 5, 2020
@elvaliuliuliu
Copy link
Contributor

elvaliuliuliu commented Nov 13, 2020

Followed the repro and data provided by @akamor, please see below as my repro and investigation:

To repro, I copied the following file into Basic.cs, read the data locally:

// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;

namespace Microsoft.Spark.Examples.Sql.Batch
{
    /// <summary>
    /// A simple example demonstrating basic Spark SQL features.
    /// </summary>
    internal sealed class Basic : IExample
    {
        public void Run(string[] args)
        {
            if (args.Length != 1)
            {
                Console.Error.WriteLine(
                    "Usage: Basic <path to SPARK_HOME/examples/src/main/resources/people.json>");
                Environment.Exit(1);
            }

            var spark = SparkSession.Builder().GetOrCreate();

            var df = spark.Read().Parquet("test.parquet");

            var expressionMap = df.Schema().Fields.ToDictionary(f => f.Name, f => df.Col(f.Name));

            Generate(expressionMap, "timestamp1", df);
            Generate(expressionMap, "timestamp2", df);

            df = df.Select(df.Schema().Fields.Select(f => expressionMap[f.Name]).ToArray());         
        }

        public static void Generate(Dictionary<string, Column> expressionMap, string column, DataFrame df)
        {
            var typeMapper = new Dictionary<string, DataType>();
            foreach (var f in df.Schema().Fields)
            {
                typeMapper.Add(f.Name, f.DataType);
            }

            var returnType = new StructType(new[] { new StructField(column, new StringType()) });

            var udf = SparkGenerator.Generate(returnType, column);

            var newCol = udf(Struct(expressionMap[column]));
            expressionMap[column] = newCol.GetField(column).Cast(typeMapper[column].SimpleString).Alias(column);

        }
    }

    public static class SparkGenerator
    {
        public static Func<Column, Column> Generate(StructType returnType, string column) =>
            Udf<Row>(row => new GenericRow(new string[] { "1970-01-02 00:00:00" }), returnType);
    }
}

input file:
test.zip

then I run the following spark-submit command

%SPARK_HOME%/bin/spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local microsoft-spark-3-0_2.12-1.0.0.jar dotnet Microsoft.Spark.CSharp.Examples.dll Sql.Batch.Basic dummy

Found the error:

Caused by: org.apache.spark.api.python.PythonException: System.InvalidCastException: Unable to cast object of type 'Microsoft.Spark.Sql.Types.Timestamp' to type 'System.Int32'.

Initial investigation:
1)Before unpickling: RowConstructor contains two RowConstructor which contains timestamptype as int value as shown in red.
screenshot1
2)In Spark3.0, after we unpickled the first RowConstructor, the second one (in red circle) is converted as Timestamp already even it’s not being iterated yet. This is causing the problem as described here
screenshot2

The second _args got converted because it's the same object as the first _args. RowConstructor is being converted only once but _args is the same object which causes the problem. See below:
MicrosoftTeams-image

cc: @imback82

@fwaris
Copy link

fwaris commented Mar 5, 2021

running into a similar issue.
Here is the schema of the dataframe "dfpWA":
root
|-- pattern: string (nullable = true)
|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- times: array (nullable = true)
| |-- element: timestamp (containsNull = false)
|-- count: integer (nullable = false)

strangely this fails:

let dfls = dfpWA.Select("times","window").ToLocalIterator() |> Seq.take 22 |> Seq.toArray

but the both of the two lines below succeed:

let dfls1 = dfpWA.Select("times").ToLocalIterator() |> Seq.take 22 |> Seq.toArray
let dfls2 = dfpWA.Select("window").ToLocalIterator() |> Seq.take 22 |> Seq.toArray

The location (i.e. row #22) is somewhat arbitrary. I have seen the same failure at different positions when processing similar data.

I am running Spark 3.0.1.

Unfortunately I can't repo this easily as data is proprietary

Here is the stacktrace:

System.InvalidCastException: Specified cast is not valid.
   at Microsoft.Spark.Sql.Types.TimestampType.FromInternal(Object obj)
   at Microsoft.Spark.Sql.Row.Convert()
   at Microsoft.Spark.Sql.RowConstructor.GetRow()
   at Microsoft.Spark.Sql.RowConstructor.GetRow()
   at Microsoft.Spark.Sql.RowCollector.<Collect>d__0.MoveNext()
   at Microsoft.Spark.Sql.RowCollector.LocalIteratorFromSocket.<GetEnumerator>d__6.MoveNext()
   at Microsoft.Spark.Sql.DataFrame.<ToLocalIterator>d__82.MoveNext()
   at Microsoft.FSharp.Collections.SeqModule.Take@686.GenerateNext(IEnumerable`1& next) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\seq.fs:line 688
   at Microsoft.FSharp.Core.CompilerServices.GeneratedSequenceBase`1.MoveNextImpl() in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\seqcore.fs:line 368
   at System.Collections.Generic.List`1..ctor(IEnumerable`1 collection)
   at Microsoft.FSharp.Collections.SeqModule.ToArray[T](IEnumerable`1 source) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\seq.fs:line 825
   at <StartupCode$FSI_0154>.$FSI_0154.main@() in c:\users\fwaris\source\repos\tgnapi\TGNApi\scripts\DeviceCorr.fsx:line 67

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
5 participants