-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53696][PYTHON][CONNECT][SQL]Default to bytes for BinaryType in PySpark #52467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
ac5d83c
1e702c9
659fe4e
e791532
6787bfc
453298d
c859ce7
09bfef3
238e2b7
8db164c
b10fc9d
7d3cf56
922c1a4
16e8428
fd5fcb2
fd687f2
ee5d2c2
31f52c8
d63d7b3
91aed20
9ce5dfd
e488e3b
4683449
852f9d9
b933d61
fa202b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -524,7 +524,7 @@ def _create_converter( | |
|
||
@staticmethod | ||
def _create_converter( | ||
dataType: DataType, *, none_on_identity: bool = False | ||
dataType: DataType, *, none_on_identity: bool = False, binary_as_bytes: bool = True | ||
) -> Optional[Callable]: | ||
assert dataType is not None and isinstance(dataType, DataType) | ||
|
||
|
@@ -542,7 +542,9 @@ def _create_converter( | |
dedup_field_names = _dedup_names(field_names) | ||
|
||
field_convs = [ | ||
ArrowTableToRowsConversion._create_converter(f.dataType, none_on_identity=True) | ||
ArrowTableToRowsConversion._create_converter( | ||
f.dataType, none_on_identity=True, binary_as_bytes=binary_as_bytes | ||
) | ||
for f in dataType.fields | ||
] | ||
|
||
|
@@ -564,7 +566,7 @@ def convert_struct(value: Any) -> Any: | |
|
||
elif isinstance(dataType, ArrayType): | ||
element_conv = ArrowTableToRowsConversion._create_converter( | ||
dataType.elementType, none_on_identity=True | ||
dataType.elementType, none_on_identity=True, binary_as_bytes=binary_as_bytes | ||
) | ||
|
||
if element_conv is None: | ||
|
@@ -589,10 +591,10 @@ def convert_array(value: Any) -> Any: | |
|
||
elif isinstance(dataType, MapType): | ||
key_conv = ArrowTableToRowsConversion._create_converter( | ||
dataType.keyType, none_on_identity=True | ||
dataType.keyType, none_on_identity=True, binary_as_bytes=binary_as_bytes | ||
) | ||
value_conv = ArrowTableToRowsConversion._create_converter( | ||
dataType.valueType, none_on_identity=True | ||
dataType.valueType, none_on_identity=True, binary_as_bytes=binary_as_bytes | ||
) | ||
|
||
if key_conv is None: | ||
|
@@ -646,7 +648,7 @@ def convert_binary(value: Any) -> Any: | |
return None | ||
else: | ||
assert isinstance(value, bytes) | ||
return bytearray(value) | ||
return value if binary_as_bytes else bytearray(value) | ||
|
||
return convert_binary | ||
|
||
|
@@ -676,7 +678,7 @@ def convert_timestample_ntz(value: Any) -> Any: | |
udt: UserDefinedType = dataType | ||
|
||
conv = ArrowTableToRowsConversion._create_converter( | ||
udt.sqlType(), none_on_identity=True | ||
udt.sqlType(), none_on_identity=True, binary_as_bytes=binary_as_bytes | ||
) | ||
|
||
if conv is None: | ||
|
@@ -726,6 +728,13 @@ def convert( # type: ignore[overload-overlap] | |
) -> List[Row]: | ||
pass | ||
|
||
@overload | ||
@staticmethod | ||
def convert( | ||
table: "pa.Table", schema: StructType, *, binary_as_bytes: bool = True | ||
) -> List[Row]: | ||
pass | ||
|
||
@overload | ||
@staticmethod | ||
def convert( | ||
|
@@ -735,7 +744,11 @@ def convert( | |
|
||
@staticmethod # type: ignore[misc] | ||
def convert( | ||
table: "pa.Table", schema: StructType, *, return_as_tuples: bool = False | ||
table: "pa.Table", | ||
schema: StructType, | ||
*, | ||
return_as_tuples: bool = False, | ||
binary_as_bytes: bool = True, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe this should be controlled by flag? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is already controlled by a flag, as It is not possible, or against the style, to access the SQL conf in this conversion.py |
||
) -> List[Union[Row, tuple]]: | ||
require_minimum_pyarrow_version() | ||
import pyarrow as pa | ||
|
@@ -748,7 +761,9 @@ def convert( | |
|
||
if len(fields) > 0: | ||
field_converters = [ | ||
ArrowTableToRowsConversion._create_converter(f.dataType, none_on_identity=True) | ||
ArrowTableToRowsConversion._create_converter( | ||
f.dataType, none_on_identity=True, binary_as_bytes=binary_as_bytes | ||
) | ||
for f in schema.fields | ||
] | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -854,14 +854,17 @@ class ArrowBatchUDFSerializer(ArrowStreamArrowUDFSerializer): | |
int_to_decimal_coercion_enabled : bool | ||
If True, applies additional coercions in Python before converting to Arrow | ||
This has performance penalties. | ||
binary_as_bytes : bool | ||
If True, binary type will be deserialized as bytes, otherwise as bytearray. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
timezone, | ||
safecheck, | ||
input_types, | ||
int_to_decimal_coercion_enabled=False, | ||
int_to_decimal_coercion_enabled, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the default value for |
||
binary_as_bytes, | ||
): | ||
super().__init__( | ||
timezone=timezone, | ||
|
@@ -871,6 +874,7 @@ def __init__( | |
) | ||
self._input_types = input_types | ||
self._int_to_decimal_coercion_enabled = int_to_decimal_coercion_enabled | ||
self._binary_as_bytes = binary_as_bytes | ||
|
||
def load_stream(self, stream): | ||
""" | ||
|
@@ -887,7 +891,9 @@ def load_stream(self, stream): | |
List of columns containing list of Python values. | ||
""" | ||
converters = [ | ||
ArrowTableToRowsConversion._create_converter(dt, none_on_identity=True) | ||
ArrowTableToRowsConversion._create_converter( | ||
dt, none_on_identity=True, binary_as_bytes=self._binary_as_bytes | ||
) | ||
for dt in self._input_types | ||
] | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can explain briefly what is the default behavior before 4.1.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I end up explaining more in the definition of
spark.sql.execution.pyspark.binaryAsBytes
in the SQL conf as it is really long...