diff --git a/samples/scala/.gitignore b/samples/scala/.gitignore new file mode 100644 index 00000000..a3103238 --- /dev/null +++ b/samples/scala/.gitignore @@ -0,0 +1,16 @@ +airports.dat +lib/ +spark-warehouse/ + +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ +.history +.cache +.lib/ +.env +.idea/ + diff --git a/samples/scala/README.md b/samples/scala/README.md new file mode 100644 index 00000000..075c7a02 --- /dev/null +++ b/samples/scala/README.md @@ -0,0 +1,40 @@ +azure-cosmosdb-spark-samples-scala +============================ +Examples of Cosmos DB on Spark that can serve as starting points for +Scala-based Cosmos DB Spark projects. + +Setup +----- +Before running any of the samples, you will need + +1. A working version of SBT (or higher recommended). + +2. The following enviroment variables set: + +``` +export COSMOS_DB_ENDPOINT= +export COSMOS_DB_MASTER_KEY= +``` + +3. A Cosmos DB database setup on Azure. You may run something like the +following Azure CLI commands: + +``` +az cosmosdb database create --db-name samples --url-connection $COSMOS_DB_ENDPOINT --key $COSMOS_DB_MASTER_KEY +az cosmosdb collection create --collection-name airports --db-name samples --throughput 5000 --url-connection $COSMOS_DB_ENDPOINT --key $COSMOS_DB_MASTER_KEY +``` + +Running Samples +--------------- +The easiest way to run the samples is to build a uber jar and submit it to +spark like so (replace +`com.microsoft.partnercatalyst.cosmosdb.samples.CSVToCosmos` with your prefered +sample): + +``` +sbt assembly +spark-submit --class com.microsoft.partnercatalyst.cosmosdb.samples.CountByCountry target/scala-2.11/azure-cosmosdb-spark-samples-assembly-0.1-SNAPSHOT.jar +``` + +You may also run the examples by opening this project in IntelliJ. + diff --git a/samples/scala/build.sbt b/samples/scala/build.sbt new file mode 100644 index 00000000..ff30b584 --- /dev/null +++ b/samples/scala/build.sbt @@ -0,0 +1,45 @@ +organization := "com.github.catalystcode" +name := "azure-cosmosdb-spark-samples" +description := "Scala examples of Cosmos DB on Spark that can serve as starting points for Cosmos DB-based Spark projects." + +scalaVersion := "2.11.7" + +scalacOptions ++= Seq( + "-unchecked", + "-deprecation", + "-feature" +) + +val sparkVersion = "2.2.0" + +libraryDependencies ++= Seq( + "com.microsoft.azure" % "azure-documentdb" % "1.12.0", + "com.microsoft.azure" % "azure-documentdb-rx" % "0.9.0-rc1", + "com.microsoft.azure" % "azure-cosmosdb-spark_2.2.0_2.11" % "0.0.3" excludeAll( + ExclusionRule(organization = "org.apache.tinkerpop"), + ExclusionRule(organization = "com.fasterxml.jackson.core", name = "jackson-databind") + ), + + "org.apache.tinkerpop" % "spark-gremlin" % "3.2.5" excludeAll( + ExclusionRule(organization = "org.apache.spark"), + ExclusionRule(organization = "org.scala-lang") + ), + "org.apache.tinkerpop" % "tinkergraph-gremlin" % "3.2.5", + + "com.fasterxml.jackson.core" % "jackson-annotations" % "2.8.0", + "com.fasterxml.jackson.core" % "jackson-core" % "2.8.3", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.8.3", + "com.fasterxml.jackson.module" % "jackson-module-paranamer" % "2.8.9", + "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.9", + + "org.apache.spark" %% "spark-core" % sparkVersion, + "org.apache.spark" %% "spark-sql" % sparkVersion, + "org.apache.spark" %% "spark-streaming" % sparkVersion, + "org.apache.spark" %% "spark-mllib" % sparkVersion +).map(_ % "compile") + +assemblyMergeStrategy in assembly := { + case PathList("META-INF", xs @ _*) => MergeStrategy.discard + case x => MergeStrategy.first +} + diff --git a/samples/scala/project/assembly.sbt b/samples/scala/project/assembly.sbt new file mode 100644 index 00000000..af03a8cf --- /dev/null +++ b/samples/scala/project/assembly.sbt @@ -0,0 +1,2 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5") + diff --git a/samples/scala/project/build.properties b/samples/scala/project/build.properties new file mode 100644 index 00000000..64317fda --- /dev/null +++ b/samples/scala/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.15 diff --git a/samples/scala/src/main/scala/com/microsoft/partnercatalyst/cosmosdb/samples/Airport.scala b/samples/scala/src/main/scala/com/microsoft/partnercatalyst/cosmosdb/samples/Airport.scala new file mode 100644 index 00000000..381664c0 --- /dev/null +++ b/samples/scala/src/main/scala/com/microsoft/partnercatalyst/cosmosdb/samples/Airport.scala @@ -0,0 +1,16 @@ +package com.microsoft.partnercatalyst.cosmosdb.samples + +case class Airport(airportId: String, + name: String, + city: String, + country: String, + iata: String /* 3-letter IATA code. Null if not assigned/unknown.*/ , + icao: String /* 4-letter ICAO code.*/ , + latitude: Double, + longitude: Double, + altitude: Double, + timezone: Double, + dst: String, + tz: String, + airportType: String, + source: String) diff --git a/samples/scala/src/main/scala/com/microsoft/partnercatalyst/cosmosdb/samples/CountAggregates.scala b/samples/scala/src/main/scala/com/microsoft/partnercatalyst/cosmosdb/samples/CountAggregates.scala new file mode 100644 index 00000000..8e4e8285 --- /dev/null +++ b/samples/scala/src/main/scala/com/microsoft/partnercatalyst/cosmosdb/samples/CountAggregates.scala @@ -0,0 +1,41 @@ +package com.microsoft.partnercatalyst.cosmosdb.samples + +import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig} +import com.microsoft.azure.cosmosdb.spark.schema._ +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.{Minutes, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} + +object CountAggregates { + + def main(args: Array[String]): Unit = { + val appName = this.getClass.getSimpleName + val conf = new SparkConf() + .setAppName(appName) + .setIfMissing("spark.master", "local[*]") + val sc = new SparkContext(conf) + val ssc = new StreamingContext(sc, Minutes(1)) + val spark = SparkSession.builder().appName(ssc.sparkContext.appName).getOrCreate() + import spark.implicits._ + + sc.setLogLevel("ERROR") + + val configMap = Map[String, String]( + CosmosDBConfig.Endpoint -> sys.env("COSMOS_DB_ENDPOINT"), + CosmosDBConfig.Masterkey -> sys.env("COSMOS_DB_MASTER_KEY"), + CosmosDBConfig.Database -> "samples", + CosmosDBConfig.Collection -> "airports", + CosmosDBConfig.ConnectionMode -> "Gateway" + ) + val cosmosConfig = Config(configMap) + + val airportsDF = spark.read.cosmosDB(cosmosConfig).as[Airport] + airportsDF.createOrReplaceTempView("airports") + + spark.sqlContext.sql("select airports.country, count(1) as airport_count from airports group by airports.country") + .createOrReplaceTempView("airport_counts") + + spark.sqlContext.sql("select min(airport_count), max(airport_count), avg(airport_count), stddev_pop(airport_count) from airport_counts") + .show() + } +} diff --git a/samples/scala/src/main/scala/com/microsoft/partnercatalyst/cosmosdb/samples/CountByCountry.scala b/samples/scala/src/main/scala/com/microsoft/partnercatalyst/cosmosdb/samples/CountByCountry.scala new file mode 100644 index 00000000..bf88e0ca --- /dev/null +++ b/samples/scala/src/main/scala/com/microsoft/partnercatalyst/cosmosdb/samples/CountByCountry.scala @@ -0,0 +1,68 @@ +package com.microsoft.partnercatalyst.cosmosdb.samples + +import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig} +import com.microsoft.azure.cosmosdb.spark.schema._ +import org.apache.spark.rdd.RDD._ +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.{Minutes, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} + +object CountByCountry { + + def main(args: Array[String]): Unit = { + val appName = this.getClass.getSimpleName + val conf = new SparkConf() + .setAppName(appName) + .setIfMissing("spark.master", "local[*]") + val sc = new SparkContext(conf) + val ssc = new StreamingContext(sc, Minutes(1)) + val spark = SparkSession.builder().appName(ssc.sparkContext.appName).getOrCreate() + import spark.implicits._ + + val cosmosConfig = Config(Map( + CosmosDBConfig.Endpoint -> sys.env("COSMOS_DB_ENDPOINT"), + CosmosDBConfig.Masterkey -> sys.env("COSMOS_DB_MASTER_KEY"), + CosmosDBConfig.Database -> "samples", + CosmosDBConfig.Collection -> "airports", + CosmosDBConfig.ConnectionMode -> "Gateway" + )) + + val airportsDF = spark.read.cosmosDB(cosmosConfig).as[Airport] + airportsDF.cache() + airportsDF.show() + + // Count number of aiports by country using reduceByKey from the incoming RDD + val rddStart = System.currentTimeMillis() + val reducedByKey = airportsDF.map(r=>(r.country, 1)) + .rdd + .reduceByKey(_+_) + .toDF("country", "airport_count") + reducedByKey.show() + val rddStop = System.currentTimeMillis() + + airportsDF.createOrReplaceTempView("airports") + val airportsTempView = airportsDF.sqlContext.table("airports") + airportsTempView.show() + + // Perform the same count using SQL + val sqlStart = System.currentTimeMillis() + val counts = airportsDF.sqlContext.sql("select airports.country, count(1) as airport_count from airports group by airports.country") + counts.show() + val sqlStop = System.currentTimeMillis() + + // Once again, perform the count by fetching from the temp table and calling reduceByKey. + val rdd2Start = System.currentTimeMillis() + airportsTempView.select("country", "airportId") + .rdd + .map(row=>(row.getString(1), 1)) + .reduceByKey(_+_) + .toDF("country", "airport_count") + .show() + val rdd2Stop = System.currentTimeMillis() + + println(s"sql time in millis: ${sqlStop - sqlStart}") + println(s"rdd time in millis: ${rddStop - rddStart}") + println(s"rdd2 time in millis: ${rdd2Stop - rdd2Start}") + } + +} diff --git a/samples/scala/src/main/scala/com/microsoft/partnercatalyst/cosmosdb/samples/UploadToCosmos.scala b/samples/scala/src/main/scala/com/microsoft/partnercatalyst/cosmosdb/samples/UploadToCosmos.scala new file mode 100644 index 00000000..473acaa1 --- /dev/null +++ b/samples/scala/src/main/scala/com/microsoft/partnercatalyst/cosmosdb/samples/UploadToCosmos.scala @@ -0,0 +1,72 @@ +package com.microsoft.partnercatalyst.cosmosdb.samples + +import java.io.{File, PrintWriter} + +import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig} +import com.microsoft.azure.cosmosdb.spark.schema._ +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.apache.spark.streaming.{Minutes, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} + +object UploadToCosmos { + + def main(args: Array[String]): Unit = { + val appName = this.getClass.getSimpleName + val conf = new SparkConf() + .setAppName(appName) + .setIfMissing("spark.master", "local[*]") + val sc = new SparkContext(conf) + val ssc = new StreamingContext(sc, Minutes(1)) + val spark = SparkSession.builder().appName(ssc.sparkContext.appName).getOrCreate() + + // Originally referenced in https://github.com/dennyglee/databricks/blob/master/notebooks/Users/denny%40databricks.com/flights/On-Time%20Flight%20Performance.py + val writer = new PrintWriter(new File("airports.dat")) + scala.io.Source.fromURL("https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat").getLines().foreach(writer.println) + + import spark.implicits._ + val airports = spark.read.csv("airports.dat").map(row => { + Airport( + row.getString(0), + row.getString(1), + row.getString(2), + row.getString(3), + row.getString(4), + row.getString(5), + SafeStringToDouble(row.getString(6)), + SafeStringToDouble(row.getString(7)), + SafeStringToDouble(row.getString(8)), + SafeStringToDouble(row.getString(9)), + row.getString(10), + row.getString(11), + row.getString(12), + row.getString(13) + ) + }) + airports.show() + + val cosmosConfig = Config(Map( + CosmosDBConfig.Endpoint -> sys.env("COSMOS_DB_ENDPOINT"), + CosmosDBConfig.Masterkey -> sys.env("COSMOS_DB_MASTER_KEY"), + CosmosDBConfig.Database -> "samples", + CosmosDBConfig.Collection -> "airports", + CosmosDBConfig.SamplingRatio -> "1.0", + CosmosDBConfig.QueryMaxRetryOnThrottled -> "10", + CosmosDBConfig.QueryMaxRetryWaitTimeSecs -> "10" + )) + + airports.write.mode(SaveMode.Append).cosmosDB(cosmosConfig) + } + + object SafeStringToDouble extends Serializable { + def apply(str: String): Double = { + try { + if (str == null) 0.0 else str.toDouble + } catch { + case nfe: NumberFormatException => 0.0 + } + } + } + +} + +