Skip to content

Commit 9871900

Browse files
author
Christian Herrera
committed
Add new optimization for join
1 parent aef160c commit 9871900

File tree

3 files changed

+31
-36
lines changed

3 files changed

+31
-36
lines changed

docker/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ services:
4040
- SPARK_MASTER=spark://spark-master:7077
4141
- SPARK_WORKER_CORES=3
4242
- SPARK_WORKER_MEMORY=3G
43-
- SPARK_EXECUTOR_MEMORY=1G
43+
- SPARK_EXECUTOR_MEMORY=3G
4444
- SPARK_WORKLOAD=worker
4545
- SPARK_LOCAL_IP=172.19.0.2
4646
volumes:
@@ -64,7 +64,7 @@ services:
6464
- SPARK_MASTER=spark://spark-master:7077
6565
- SPARK_WORKER_CORES=3
6666
- SPARK_WORKER_MEMORY=3G
67-
- SPARK_EXECUTOR_MEMORY=1G
67+
- SPARK_EXECUTOR_MEMORY=3G
6868
- SPARK_WORKLOAD=worker
6969
- SPARK_LOCAL_IP=172.19.0.3
7070
volumes:

src/main/com/codely/lesson_05_monitoring_and_optimizations/video_02_broadcast_join/JoinOptimizationApp.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.codely.lesson_05_monitoring_and_optimizations.video_02_broadcast_join
22

3-
import org.apache.spark.sql.functions.{broadcast, col}
3+
import org.apache.spark.sql.functions.broadcast
44

55
object JoinOptimizationApp extends SparkApp {
66

7-
// ./bin/spark-shell --master spark://spark-master:7077 --driver-memory 4g --executor-memory 1024mb --conf spark.sql.adaptive.enabled=false
7+
// ./bin/spark-shell --master spark://spark-master:7077 --driver-memory 3g --conf spark.sql.adaptive.enabled=false
88

99
spark.sparkContext.setLogLevel("WARN")
1010

@@ -13,22 +13,30 @@ object JoinOptimizationApp extends SparkApp {
1313
spark.sparkContext.setJobGroup("join without optimization", "join without optimization")
1414

1515
val largeDF = spark
16-
.range(0, 10000000L, 3) // 3.3 M
16+
.range(0, 10000000L) // 10M
1717
.map(i => (i, s"Name$i"))
18-
.toDF("id", "name")
18+
.toDF("id", "fieldA")
1919

20-
val veryLargeDF = spark
21-
.range(0, 100000000L, 2) // 50 M
20+
val veryLargeDF = spark // 50 M
21+
.range(0, 50000000L)
2222
.map(i => (i, s"Other$i"))
23-
.toDF("id", "other")
23+
.toDF("id", "fieldB")
2424

25-
veryLargeDF.join(largeDF, "id").filter(col("id") === 1).show(false)
25+
veryLargeDF.join(largeDF, "id").count()
26+
27+
spark.sparkContext.clearJobGroup()
28+
29+
spark.sparkContext.setJobGroup("join with 12 shuffle partitions", "join with 12 shuffle partitions")
30+
31+
spark.conf.set("spark.sql.shuffle.partitions", "12")
32+
33+
veryLargeDF.join(largeDF, "id").count()
2634

2735
spark.sparkContext.clearJobGroup()
2836

2937
spark.sparkContext.setJobGroup("join with optimization", "join with optimization")
3038

31-
veryLargeDF.join(broadcast(largeDF), "id").show()
39+
veryLargeDF.join(broadcast(largeDF), "id").count()
3240

3341
spark.sparkContext.clearJobGroup()
3442

src/main/com/codely/lesson_05_monitoring_and_optimizations/video_03_skew_join/SkewJoinApp.scala

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,53 +4,40 @@ import org.apache.spark.sql.functions.when
44

55
object SkewJoinApp extends SparkApp {
66

7-
// ./bin/spark-shell --master spark://spark-master:7077 --driver-memory 4g --executor-memory 1024mb --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.adaptive.enabled=false
7+
// ./bin/spark-shell --master spark://spark-master:7077 --driver-memory 3g --executor-memory 1024mb --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.adaptive.enabled=false
88

99
spark.sparkContext.setLogLevel("WARN")
1010

1111
import spark.implicits._
1212

1313
spark.sparkContext.setJobGroup("skewed data", "skewed data")
1414

15-
val skewedData = spark
16-
.range(0, 10000000) // 10M
17-
.withColumn("key", when($"id" < 10, $"id").otherwise(999))
18-
.withColumn("value", $"id")
19-
2015
val uniformData = spark
21-
.range(0, 1000000) // 1M
16+
.range(0, 10000000) // 10M
2217
.withColumn("key", $"id")
2318
.withColumn("value", $"id")
2419

25-
val joined = skewedData.join(uniformData, "key")
20+
val skewedData = spark
21+
.range(0, 200000000) // 200M
22+
.withColumn("key", when($"id" < 10000000, $"id").otherwise(999))
23+
.withColumn("value", $"id")
2624

27-
val res = joined.filter($"key" === 999).count()
28-
println(s"Count for skew key (999): $res")
25+
skewedData.join(uniformData, "key").count()
2926

3027
spark.sparkContext.clearJobGroup()
3128

3229
spark.sparkContext.setJobGroup("adaptative query execution", "adaptative query execution")
3330

3431
spark.conf.set("spark.sql.adaptive.enabled", "true")
32+
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "1")
33+
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "20MB")
34+
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "15MB")
3535

36-
import org.apache.spark.sql.functions._
37-
38-
val skewedDataAQE = spark
39-
.range(0, 10000000) // 10M
40-
.withColumn("key", when($"id" < 10, $"id").otherwise(999))
41-
.withColumn("value", $"id")
42-
43-
val uniformDataAQE = spark
44-
.range(0, 1000000) // 1M
45-
.withColumn("key", $"id")
46-
.withColumn("value", $"id")
47-
48-
val joinedAQE = skewedDataAQE.join(uniformDataAQE, "key")
36+
val joinedAQE = skewedData.join(uniformData, "key")
4937

5038
joinedAQE.explain(true)
5139

52-
val resAQE = joinedAQE.filter($"key" === 999).count()
53-
println(s"Count for skew key (999): $resAQE")
40+
joinedAQE.count()
5441

5542
spark.sparkContext.clearJobGroup()
5643

0 commit comments

Comments
 (0)