-
Notifications
You must be signed in to change notification settings - Fork 327
[BUG]: Applying UDFs to TimestampType causes occasional exception #760
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Here is the parquet file I used for my test to save you the trouble of having to make one. |
Thanks @akamor for reporting this issue! @elvaliuliuliu Can you help with this? |
I will take a look! |
@akamor Thanks for providing the repro as well as the parquet data. I have run this test multiple times (>10) and they all succeeded. How frequent are you getting this error? |
@elvaliuliuliu In powershell (not WSL): mkdir spark_cache_bug
cd spark_cache_bug
dotnet new console
dotnet add package Microsoft.spark Now open open Program.cs in your editor of choice and copy the contents of my Program.cs from above. You'll need to provide your own credentials for AWS on lines 50 and 51 and you'll need to extract the zip file and upload the parquet to a s3 folder. You'll have to reference that folder on line 52 and then reference a desired output folder on line 53. I am not sure if this will reproduce if not using s3a. If you don't have an AWS setup I can likely DM you the necessary credentials to use. dotnet build
cd bin/Debug/netcoreapp3.1
spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2 --master local .\microsoft-spark-3-0_2.12-1.0.0.jar dotnet .\spark_cache_bug.dll
|
Does this only repro if you use s3? What if you read/write to a local storage ? |
@suhsteve I've only ever repo'd this on S3. Based on our investigation I think there is likely some race condition so it is believable that it might not occur using local storage, imo. can y'all try on s3? I can also send over some creds if you need assuming it can be done in a secure way. |
@elvaliuliuliu Sorry, just saw you already asked for the AWS setup. I'll get that to you in a few minutes! |
@elvaliuliuliu Sorry, I don't actually know how to DM you. How do I do that? I'm ready to send the creds when I figure that out. |
If you would like to provide a contact info, I can reach out. |
Sure. Can you reach out to hello@tonic.ai. I'll see the e-mail and respond. |
@elvaliuliuliu Details sent! |
Sorry I don’t seem to receive anything yet. |
Just sent again. Please check spam folder, I guess? |
Yep, got it. Thanks! |
@akamor I was able to repro this issue with Spark 3.0. Thanks for providing all the details! However, spark 2.4 works in this case. Maybe you can try to unblock yourself with Spark2.4 for now and I will take a closer look at this with Spark 3.0. Thanks. |
Moving back to Spark 2.4 is not an option for us, unfortunately. We have a non-ideal fix at the moment which we've implemented in RowConstructor.GetRow. Instead of passing _args into the Row constructor we instead copy _args first into a new object[] then pass it along. I'm sure we are taking a performance hit there, but it does fix things. It is also why we think this is somehow a cache-ing or race issue but we never really investigated too far. Another option we discussed briefly was just adding some type checking in the TimeStampType and DateType FromInternal methods to simply early return if the val passed in was already the correct time (i.e. not a long or int). Of course, this is a band-aid and does not fix the underlying issue, whatever it may be. Do any of y'all have thoughts on the above? |
I agree with @akamor , the above two methods are not permanent fixes. I will investigate more on the underlying issue and get back on this. |
Any update on this? We need to decide whether this is a blocker for 1.0.1 release or not. |
@imback82 It is certainly causing issues for us in multiple production deployments. We'd be very appreciative if this could get fixed in 1.0.1. We can also assist with testing if that helps. |
Spark 3.0 works fine in local environment but not with remote storage s3 in this case. I am actually not sure what caused this issue with remote storage for Spark 3.0. @akamor I have a potential fix (PR #765) which should unblock you. Please kindly try it and let me know if this doesn't address the issue. Any other ideas are very welcome. |
Thanks @elvaliuliuliu I'll give it a whirl in a moment. We actually did a similar fix on our side but decided to go more heavy-handed in case there were other hidden issues. It seems like the real issue here is re-use of the arrays storing values before being passed to the Row constructor (line 104 of spark/src/csharp/Microsoft.Spark/sql/RowConstructor.cs. Is that your take as well? If so, is it possible this issue could creep up again in another subtle way? |
I can confirm the PR fixes the issue in our test cases. Thank you very much for such a rapid turnaround. With that being said, I would be curious to hear your thoughts on my above point. |
Followed the repro and data provided by @akamor, please see below as my repro and investigation: To repro, I copied the following file into
input file: then I run the following
Found the error:
Initial investigation: The second _args got converted because it's the same object as the first _args. RowConstructor is being converted only once but _args is the same object which causes the problem. See below: cc: @imback82 |
running into a similar issue. strangely this fails: let dfls = dfpWA.Select("times","window").ToLocalIterator() |> Seq.take 22 |> Seq.toArray but the both of the two lines below succeed: let dfls1 = dfpWA.Select("times").ToLocalIterator() |> Seq.take 22 |> Seq.toArray
let dfls2 = dfpWA.Select("window").ToLocalIterator() |> Seq.take 22 |> Seq.toArray The location (i.e. row #22) is somewhat arbitrary. I have seen the same failure at different positions when processing similar data. I am running Spark 3.0.1. Unfortunately I can't repo this easily as data is proprietary Here is the stacktrace: System.InvalidCastException: Specified cast is not valid.
at Microsoft.Spark.Sql.Types.TimestampType.FromInternal(Object obj)
at Microsoft.Spark.Sql.Row.Convert()
at Microsoft.Spark.Sql.RowConstructor.GetRow()
at Microsoft.Spark.Sql.RowConstructor.GetRow()
at Microsoft.Spark.Sql.RowCollector.<Collect>d__0.MoveNext()
at Microsoft.Spark.Sql.RowCollector.LocalIteratorFromSocket.<GetEnumerator>d__6.MoveNext()
at Microsoft.Spark.Sql.DataFrame.<ToLocalIterator>d__82.MoveNext()
at Microsoft.FSharp.Collections.SeqModule.Take@686.GenerateNext(IEnumerable`1& next) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\seq.fs:line 688
at Microsoft.FSharp.Core.CompilerServices.GeneratedSequenceBase`1.MoveNextImpl() in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\seqcore.fs:line 368
at System.Collections.Generic.List`1..ctor(IEnumerable`1 collection)
at Microsoft.FSharp.Collections.SeqModule.ToArray[T](IEnumerable`1 source) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\seq.fs:line 825
at <StartupCode$FSI_0154>.$FSI_0154.main@() in c:\users\fwaris\source\repos\tgnapi\TGNApi\scripts\DeviceCorr.fsx:line 67 |
Describe the bug
When applying multiple UDFs to a dataframe an error is thrown when at least one of the target columns of the UDF is a TimestampType column.
To Reproduce
Below is a minimal reproduction you can use. It includes a comment on how to create the necessary parquet file. It must have 2 timestamp columns and a sufficient number of rows (~2k) for the issue to occur. It is launched via the below command:
The exception thrown is:
Our investigation
After investigating a bit, it appears to be because a given row's TimestampType column is sometimes passed to TimestampType.FromInternal multiple times (once per UDF call). The first time FromInternal is called it properly creates a new Timestamp object. The second time, however, that same Timestamp object is passed again into the function where the function is expecting an int or long. The culprit might be RowConstructor.GetRow which appears to possible re-use _args. When I modify GetRows to first make a copy of _args and pass the copy to the Row constructor the issue goes away.
Desktop (please complete the following information):
Additional context
This error won't always occur but occurs with high probability if the dataframe has at least 2k rows.
The text was updated successfully, but these errors were encountered: