Skip to content

Commit 8a3ca95

Browse files
authored
add statistics and remove repartition 1 (#40)
1 parent e87f9d8 commit 8a3ca95

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,25 @@ object Main {
7979
val sparkConfig = SparkConfig.getSpark(configs)
8080
val partitionNum = sparkConfig.partitionNum
8181

82+
val startTime = System.currentTimeMillis()
8283
// reader
83-
val dataSet = createDataSource(sparkConfig.spark, configs, partitionNum)
84+
val dataSet = createDataSource(sparkConfig.spark, configs, partitionNum)
85+
val readTime = System.currentTimeMillis()
8486

8587
// algorithm
8688
val algoResult = executeAlgorithm(sparkConfig.spark, algoName, configs, dataSet)
89+
val algoTime = System.currentTimeMillis()
90+
8791
// writer
8892
saveAlgoResult(algoResult, configs)
93+
val endTime = System.currentTimeMillis()
8994

95+
sparkConfig.spark.stop()
96+
val readDuration = ((readTime - startTime) / 1000.0).formatted("%.4f")
97+
val algoDuration = ((algoTime - readTime) / 1000.0).formatted("%.4f")
98+
val writeDuration = ((endTime - algoTime) / 1000.0).formatted("%.4f")
99+
LOGGER.info(
100+
s"read data source cost: $readDuration s, algo cost: $algoDuration s, write algo result cost: $writeDuration s")
90101
sys.exit(0)
91102
}
92103

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@ class NebulaWriter(data: DataFrame, configs: Configs) extends AlgoWriter(data, c
5050
class CsvWriter(data: DataFrame, configs: Configs) extends AlgoWriter(data, configs) {
5151
override def write(): Unit = {
5252
val resultPath = configs.localConfigEntry.resultPath
53-
data.repartition(1).write.option("header", true).csv(resultPath)
53+
data.write.option("header", true).csv(resultPath)
5454
}
5555
}
5656

5757
class TextWriter(data: DataFrame, configs: Configs) extends AlgoWriter(data, configs) {
5858
override def write(): Unit = {
5959
val resultPath = configs.localConfigEntry.resultPath
60-
data.repartition(1).write.option("header", true).text(resultPath)
60+
data.write.option("header", true).text(resultPath)
6161
}
6262
}

0 commit comments

Comments
 (0)