Skip to content

Commit bc51417

Browse files
RangePartitioner (+ minor config fixes)
1 parent 10a0420 commit bc51417

File tree

4 files changed

+33
-13
lines changed

4 files changed

+33
-13
lines changed

docs/rdd/Partitioner.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,5 @@ numPartitions: Int
2727

2828
## Implementations
2929

30-
* CoalescedPartitioner (Spark SQL)
3130
* [HashPartitioner](HashPartitioner.md)
3231
* [RangePartitioner](RangePartitioner.md)

docs/rdd/RangePartitioner.md

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
# RangePartitioner
22

3-
`RangePartitioner` is a [Partitioner](Partitioner.md) for **bucketed partitioning**.
3+
`RangePartitioner` is a [Partitioner](Partitioner.md) that partitions sortable records by range into roughly equal ranges (that can be used for **bucketed partitioning**).
44

5-
`RangePartitioner` is used for [sortByKey](OrderedRDDFunctions.md#sortByKey) operator (among other uses).
5+
`RangePartitioner` is used for [sortByKey](OrderedRDDFunctions.md#sortByKey) operator (_mostly_).
66

77
## Creating Instance
88

99
`RangePartitioner` takes the following to be created:
1010

11-
* <span id="partitions"> Number of Partitions
11+
* <span id="partitions"> Hint for the number of partitions
1212
* <span id="rdd"> Key-Value [RDD](RDD.md) (`RDD[_ <: Product2[K, V]]`)
1313
* <span id="ascending"> `ascending` flag (default: `true`)
1414
* <span id="samplePointsPerPartitionHint"> samplePointsPerPartitionHint (default: `20`)
@@ -19,33 +19,54 @@
1919
numPartitions: Int
2020
```
2121

22-
`numPartitions` is the length of the [rangeBounds](#rangeBounds) array plus `1`.
23-
2422
`numPartitions` is part of the [Partitioner](Partitioner.md#numPartitions) abstraction.
2523

24+
---
25+
26+
`numPartitions` is 1 more than the length of the [range bounds](#rangeBounds) (since the number of [range bounds](#rangeBounds) is 0 for 0 or 1 partitions).
27+
2628
## <span id="getPartition"> Partition for Key
2729

2830
```scala
2931
getPartition(
3032
key: Any): Int
3133
```
3234

33-
`getPartition`...FIXME
34-
3535
`getPartition` is part of the [Partitioner](Partitioner.md#getPartition) abstraction.
3636

37+
---
38+
39+
`getPartition` branches off based on the length of the [range bounds](#rangeBounds).
40+
41+
For up to 128 range bounds, `getPartition` is either the first range bound (from the [rangeBounds](#rangeBounds)) for which the `key` value is greater than the value of the range bound or 128 (if no value was found among the [rangeBounds](#rangeBounds)). `getPartition` starts finding a candidate partition number from `0` and walks over the [rangeBounds](#rangeBounds) until a range bound for which the given `key` value is greater than the value of the range bound is found or there are no more [rangeBounds](#rangeBounds). `getPartition` increments the candidate partition candidate every iteration.
42+
43+
For the number of the [rangeBounds](#rangeBounds) above 128, `getPartition`...FIXME
44+
45+
In the end, `getPartition` returns the candidate partition number for the [ascending](#ascending) enabled, or flips it (to be the number of the [rangeBounds](#rangeBounds) minus the candidate partition number), otheriwse.
46+
3747
## <span id="rangeBounds"> Range Bounds
3848

3949
```scala
4050
rangeBounds: Array[K]
4151
```
4252

43-
`rangeBounds` is an `Array[K]`...FIXME
53+
`rangeBounds` is an array of upper bounds.
54+
55+
For the [number of partitions](#partitions) up to and including 1, `rangeBounds` is an empty array.
56+
57+
For more than 1 [partitions](#partitions), `rangeBounds` determines the sample size per partitions. The total sample size is the [samplePointsPerPartitionHint](#samplePointsPerPartitionHint) multiplied by the [number of partitions](#partitions) capped by `1e6`. `rangeBounds` allows for 3x over-sample per partition.
58+
59+
`rangeBounds` [sketches](#sketch) the keys of the [input rdd](#rdd) (with the `sampleSizePerPartition`).
60+
61+
!!! note
62+
There is more going on in `rangeBounds`.
63+
64+
In the end, `rangeBounds` [determines the bounds](#determineBounds).
4465

45-
### <span id="determineBounds"> determineBounds Utility
66+
### <span id="determineBounds"> determineBounds
4667

4768
```scala
48-
determineBounds[K : Ordering : ClassTag](
69+
determineBounds[K: Ordering](
4970
candidates: ArrayBuffer[(K, Float)],
5071
partitions: Int): Array[K]
5172
```

docs/scheduler/DAGScheduler.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1483,7 +1483,7 @@ The lookup table of all stages per `ActiveJob` id
14831483
nextJobId: AtomicInteger
14841484
```
14851485

1486-
`nextJobId` is a Java [AtomicInteger]({{ java.doc }}/java/util/concurrent/atomic/AtomicInteger.html) for job IDs.
1486+
`nextJobId` is a Java [AtomicInteger]({{ java.api }}/java/util/concurrent/atomic/AtomicInteger.html) for job IDs.
14871487

14881488
`nextJobId` starts at `0`.
14891489

docs/scheduler/DAGSchedulerEventProcessLoop.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
`DAGSchedulerEventProcessLoop` is registered under the name of **dag-scheduler-event-loop**.
66

7-
`DAGSchedulerEventProcessLoop` uses [java.util.concurrent.LinkedBlockingDeque]({{ java.doc }}/java/util/concurrent/LinkedBlockingDeque.html) blocking deque that can grow indefinitely.
7+
`DAGSchedulerEventProcessLoop` uses [java.util.concurrent.LinkedBlockingDeque]({{ java.api }}/java/util/concurrent/LinkedBlockingDeque.html) blocking deque that can grow indefinitely.
88

99
## Creating Instance
1010

0 commit comments

Comments
 (0)