Skip to content

Commit 3670b67

Browse files
committed
fix: support nullable columns in pre-sorted data sources (apache#16783)
1 parent 9a98f71 commit 3670b67

File tree

4 files changed

+79
-50
lines changed

4 files changed

+79
-50
lines changed

datafusion/datasource/src/file_scan_config.rs

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1908,13 +1908,28 @@ mod tests {
19081908
struct File {
19091909
name: &'static str,
19101910
date: &'static str,
1911-
statistics: Vec<Option<(f64, f64)>>,
1911+
statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
19121912
}
19131913
impl File {
19141914
fn new(
19151915
name: &'static str,
19161916
date: &'static str,
19171917
statistics: Vec<Option<(f64, f64)>>,
1918+
) -> Self {
1919+
Self::new_nullable(
1920+
name,
1921+
date,
1922+
statistics
1923+
.into_iter()
1924+
.map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1925+
.collect(),
1926+
)
1927+
}
1928+
1929+
fn new_nullable(
1930+
name: &'static str,
1931+
date: &'static str,
1932+
statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
19181933
) -> Self {
19191934
Self {
19201935
name,
@@ -1981,21 +1996,35 @@ mod tests {
19811996
sort: vec![col("value").sort(false, true)],
19821997
expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
19831998
},
1984-
// reject nullable sort columns
19851999
TestCase {
1986-
name: "no nullable sort columns",
2000+
name: "nullable sort columns, nulls last",
19872001
file_schema: Schema::new(vec![Field::new(
19882002
"value".to_string(),
19892003
DataType::Float64,
1990-
true, // should fail because nullable
2004+
true,
19912005
)]),
19922006
files: vec![
1993-
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1994-
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1995-
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
2007+
File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]),
2008+
File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
2009+
File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
19962010
],
19972011
sort: vec![col("value").sort(true, false)],
1998-
expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column")
2012+
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
2013+
},
2014+
TestCase {
2015+
name: "nullable sort columns, nulls first",
2016+
file_schema: Schema::new(vec![Field::new(
2017+
"value".to_string(),
2018+
DataType::Float64,
2019+
true,
2020+
)]),
2021+
files: vec![
2022+
File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
2023+
File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]),
2024+
File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
2025+
],
2026+
sort: vec![col("value").sort(true, true)],
2027+
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]])
19992028
},
20002029
TestCase {
20012030
name: "all three non-overlapping",
@@ -2153,12 +2182,12 @@ mod tests {
21532182
.map(|stats| {
21542183
stats
21552184
.map(|(min, max)| ColumnStatistics {
2156-
min_value: Precision::Exact(ScalarValue::from(
2157-
min,
2158-
)),
2159-
max_value: Precision::Exact(ScalarValue::from(
2160-
max,
2161-
)),
2185+
min_value: Precision::Exact(
2186+
ScalarValue::Float64(min),
2187+
),
2188+
max_value: Precision::Exact(
2189+
ScalarValue::Float64(max),
2190+
),
21622191
..Default::default()
21632192
})
21642193
.unwrap_or_default()

datafusion/datasource/src/statistics.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use arrow::{
3333
row::{Row, Rows},
3434
};
3535
use datafusion_common::stats::Precision;
36-
use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result};
36+
use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
3737
use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
3838
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3939
use datafusion_physical_plan::{ColumnStatistics, Statistics};
@@ -228,14 +228,7 @@ impl MinMaxStatistics {
228228
.zip(sort_columns.iter().copied())
229229
.map(|(sort_expr, column)| {
230230
let schema = values.schema();
231-
232231
let idx = schema.index_of(column.name())?;
233-
let field = schema.field(idx);
234-
235-
// check that sort columns are non-nullable
236-
if field.is_nullable() {
237-
return plan_err!("cannot sort by nullable column");
238-
}
239232

240233
Ok(SortColumn {
241234
values: Arc::clone(values.column(idx)),

datafusion/sqllogictest/test_files/parquet.slt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,7 @@ STORED AS PARQUET;
130130
----
131131
3
132132

133-
# Check output plan again, expect no "output_ordering" clause in the physical_plan -> ParquetExec,
134-
# due to there being more files than partitions:
133+
# Check output plan again
135134
query TT
136135
EXPLAIN SELECT int_col, string_col
137136
FROM test_table
@@ -142,8 +141,7 @@ logical_plan
142141
02)--TableScan: test_table projection=[int_col, string_col]
143142
physical_plan
144143
01)SortPreservingMergeExec: [string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST]
145-
02)--SortExec: expr=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], preserve_partitioning=[true]
146-
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], file_type=parquet
144+
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], file_type=parquet
147145

148146

149147
# Perform queries using MIN and MAX

datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,22 @@ CREATE TABLE src_table (
3838
bigint_col BIGINT,
3939
date_col DATE,
4040
overlapping_col INT,
41-
constant_col INT
41+
constant_col INT,
42+
nulls_first_col INT,
43+
nulls_last_col INT
4244
) AS VALUES
4345
-- first file
44-
(1, 3, 'aaa', 100, 1, 0, 0),
45-
(2, 2, 'bbb', 200, 2, 1, 0),
46-
(3, 1, 'ccc', 300, 3, 2, 0),
46+
(1, 3, 'aaa', 100, 1, 0, 0, NULL, 1),
47+
(2, 2, 'bbb', 200, 2, 1, 0, NULL, 2),
48+
(3, 1, 'ccc', 300, 3, 2, 0, 1, 3),
4749
-- second file
48-
(4, 6, 'ddd', 400, 4, 0, 0),
49-
(5, 5, 'eee', 500, 5, 1, 0),
50-
(6, 4, 'fff', 600, 6, 2, 0),
50+
(4, 6, 'ddd', 400, 4, 0, 0, 2, 4),
51+
(5, 5, 'eee', 500, 5, 1, 0, 3, 5),
52+
(6, 4, 'fff', 600, 6, 2, 0, 4, 6),
5153
-- third file
52-
(7, 9, 'ggg', 700, 7, 3, 0),
53-
(8, 8, 'hhh', 800, 8, 4, 0),
54-
(9, 7, 'iii', 900, 9, 5, 0);
54+
(7, 9, 'ggg', 700, 7, 3, 0, 5, 7),
55+
(8, 8, 'hhh', 800, 8, 4, 0, 6, NULL),
56+
(9, 7, 'iii', 900, 9, 5, 0, 7, NULL);
5557

5658
# Setup 3 files, in particular more files than there are partitions
5759

@@ -90,45 +92,52 @@ CREATE EXTERNAL TABLE test_table (
9092
bigint_col BIGINT NOT NULL,
9193
date_col DATE NOT NULL,
9294
overlapping_col INT NOT NULL,
93-
constant_col INT NOT NULL
95+
constant_col INT NOT NULL,
96+
nulls_first_col INT,
97+
nulls_last_col INT
9498
)
9599
STORED AS PARQUET
96100
PARTITIONED BY (partition_col)
97-
WITH ORDER (int_col ASC NULLS LAST, bigint_col ASC NULLS LAST)
101+
WITH ORDER (
102+
int_col ASC NULLS LAST,
103+
bigint_col ASC NULLS LAST,
104+
nulls_first_col ASC NULLS FIRST,
105+
nulls_last_col ASC NULLS LAST
106+
)
98107
LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table';
99108

100109
# Order by numeric columns
101110
# This is to exercise file group sorting, which uses file-level statistics
102111
# DataFusion doesn't currently support string column statistics
103112
# This should not require a sort.
104113
query TT
105-
EXPLAIN SELECT int_col, bigint_col
114+
EXPLAIN SELECT int_col, bigint_col, nulls_first_col, nulls_last_col
106115
FROM test_table
107-
ORDER BY int_col, bigint_col;
116+
ORDER BY int_col, bigint_col, nulls_first_col NULLS FIRST, nulls_last_col NULLS LAST;
108117
----
109118
logical_plan
110-
01)Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST
111-
02)--TableScan: test_table projection=[int_col, bigint_col]
119+
01)Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST, test_table.nulls_first_col ASC NULLS FIRST, test_table.nulls_last_col ASC NULLS LAST
120+
02)--TableScan: test_table projection=[int_col, bigint_col, nulls_first_col, nulls_last_col]
112121
physical_plan
113-
01)SortPreservingMergeExec: [int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST]
114-
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet
122+
01)SortPreservingMergeExec: [int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST]
123+
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST], file_type=parquet
115124

116125
# Another planning test, but project on a column with unsupported statistics
117126
# We should be able to ignore this and look at only the relevant statistics
118127
query TT
119128
EXPLAIN SELECT string_col
120129
FROM test_table
121-
ORDER BY int_col, bigint_col;
130+
ORDER BY int_col, bigint_col, nulls_first_col NULLS FIRST, nulls_last_col NULLS LAST;
122131
----
123132
logical_plan
124133
01)Projection: test_table.string_col
125-
02)--Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST
126-
03)----Projection: test_table.string_col, test_table.int_col, test_table.bigint_col
127-
04)------TableScan: test_table projection=[int_col, string_col, bigint_col]
134+
02)--Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST, test_table.nulls_first_col ASC NULLS FIRST, test_table.nulls_last_col ASC NULLS LAST
135+
03)----Projection: test_table.string_col, test_table.int_col, test_table.bigint_col, test_table.nulls_first_col, test_table.nulls_last_col
136+
04)------TableScan: test_table projection=[int_col, string_col, bigint_col, nulls_first_col, nulls_last_col]
128137
physical_plan
129138
01)ProjectionExec: expr=[string_col@0 as string_col]
130-
02)--SortPreservingMergeExec: [int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST]
131-
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[string_col, int_col, bigint_col], output_ordering=[int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST], file_type=parquet
139+
02)--SortPreservingMergeExec: [int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST]
140+
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[string_col, int_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST], file_type=parquet
132141

133142
# Clean up & recreate but sort on descending column
134143
statement ok

0 commit comments

Comments
 (0)