parts = com.microsoft.azure.documentdb.internal.PathParser.getPathParts(path);
+ if (parts.size() >= 1) {
+ Object value = resource.getObjectByPath(parts);
+ if (value == null || value.getClass() == JSONObject.class) {
+ value = Undefined.Value();
+ }
+
+ return PartitionKeyInternal.fromObjectArray(Arrays.asList(value), false);
+ }
+ }
+
+ return null;
+ }
+
+ public static PartitionKeyInternal fromPartitionKeyvalue(Object partitionKeyValue) {
+ try {
+ return PartitionKeyInternal.fromObjectArray(Collections.singletonList(partitionKeyValue), true);
+ } catch (Exception e) {
+ LOGGER.error("Failed to instantiate ParitionKeyInternal from {}", partitionKeyValue, e);
+ throw toRuntimeException(e);
+ }
+ }
+
+ public static RuntimeException toRuntimeException(Exception e) {
+ if (e instanceof RuntimeException) {
+ return (RuntimeException) e;
+ } else {
+ return new RuntimeException(e);
+ }
+ }
+ }
+
+ final static class PathParser
+ {
+ private final static char SEGMENT_SEPARATOR = '/';
+ private final static String ERROR_MESSAGE_FORMAT = "Invalid path \"%s\", failed at %d";
+
+ /**
+ * Extract parts from a given path for '/' as the separator.
+ *
+ * This code doesn't do as much validation as the backend, as it assumes that IndexingPolicy path coming from the backend is valid.
+ *
+ * @param path specifies a partition key given as a path.
+ * @return a list of all the parts for '/' as the separator.
+ */
+ public static List getPathParts(String path)
+ {
+ List tokens = new ArrayList();
+ AtomicInteger currentIndex = new AtomicInteger();
+
+ while (currentIndex.get() < path.length())
+ {
+ char currentChar = path.charAt(currentIndex.get());
+ if (currentChar != SEGMENT_SEPARATOR)
+ {
+ throw new IllegalArgumentException(
+ String.format(ERROR_MESSAGE_FORMAT, path, currentIndex.get()));
+ }
+
+ if (currentIndex.incrementAndGet() == path.length())
+ {
+ break;
+ }
+
+ currentChar = path.charAt(currentIndex.get());
+ if (currentChar == '\"' || currentChar == '\'')
+ {
+ // Handles the partial path given in quotes such as "'abc/def'"
+ tokens.add(getEscapedToken(path, currentIndex));
+ }
+ else
+ {
+ tokens.add(getToken(path, currentIndex));
+ }
+ }
+
+ return tokens;
+ }
+
+ private static String getEscapedToken(String path, AtomicInteger currentIndex)
+ {
+ char quote = path.charAt(currentIndex.get());
+ int newIndex = currentIndex.incrementAndGet();
+
+ while (true)
+ {
+ newIndex = path.indexOf(quote, newIndex);
+ if (newIndex == -1)
+ {
+ throw new IllegalArgumentException(
+ String.format(ERROR_MESSAGE_FORMAT, path, currentIndex.get()));
+ }
+
+ // Ignore escaped quote in the partial path we look at such as "'abc/def \'12\'/ghi'"
+ if (path.charAt(newIndex - 1) != '\\')
+ {
+ break;
+ }
+
+ ++newIndex;
+ }
+
+ String token = path.substring(currentIndex.get(), newIndex);
+ currentIndex.set(newIndex + 1);
+
+ return token;
+ }
+
+ private static String getToken(String path, AtomicInteger currentIndex)
+ {
+ int newIndex = path.indexOf(SEGMENT_SEPARATOR, currentIndex.get());
+ String token = null;
+ if (newIndex == -1)
+ {
+ token = path.substring(currentIndex.get());
+ currentIndex.set(path.length());
+ }
+ else
+ {
+ token = path.substring(currentIndex.get(), newIndex);
+ currentIndex.set(newIndex);
+ }
+
+ token = token.trim();
+ return token;
+ }
+ }
+}
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/BulkExecutorSettings.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/BulkExecutorSettings.scala
index 9d2e2cb1..0d2d93fa 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/BulkExecutorSettings.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/BulkExecutorSettings.scala
@@ -32,4 +32,7 @@ package com.microsoft.azure.cosmosdb.spark
*/
private[spark] case class BulkExecutorSettings(
maxMiniBatchUpdateCount: Int,
- partitionKeyOption: Option[String])
\ No newline at end of file
+ partitionKeyOption: Option[String],
+ countLoggingPath: Option[String],
+ bulkLoggingPath: Option[String],
+ bulkLoggingCorrelationId: Option[String])
\ No newline at end of file
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala
index 02e0cd71..cf796c5f 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala
@@ -25,10 +25,10 @@ package com.microsoft.azure.cosmosdb.spark
import com.microsoft.azure.cosmosdb.spark.config._
import com.microsoft.azure.documentdb._
import com.microsoft.azure.documentdb.internal._
-
import java.lang.management.ManagementFactory
import scala.collection.JavaConversions._
+import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
@@ -50,7 +50,11 @@ private[spark] case class ClientConfiguration(
consistencyLevel: String,
database: String,
container: String,
- bulkConfig: BulkExecutorSettings) {
+ bulkConfig: BulkExecutorSettings,
+ countLoggingPath: Option[String],
+ queryLoggingPath: Option[String],
+ queryLoggingCorrelationId: Option[String],
+ hadoopConfig: mutable.Map[String, String]) {
def getCollectionLink(): String = {
ClientConfiguration.getCollectionLink(database, container)
@@ -59,14 +63,25 @@ private[spark] case class ClientConfiguration(
def getDatabaseLink() : String = {
ClientConfiguration.getDatabaseLink(database)
}
+
+ def getQueryLoggingPath(): Option[String] = {
+ queryLoggingPath
+ }
+
+ def getCountLoggingPath(): Option[String] = {
+ countLoggingPath
+ }
}
object ClientConfiguration extends CosmosDBLoggingTrait {
- def apply(config: Config): ClientConfiguration = {
+ def apply(config: Config, hadoopConfig: mutable.Map[String, String]): ClientConfiguration = {
val database : String = config.get(CosmosDBConfig.Database).get
val collection : String = config.get(CosmosDBConfig.Collection).get
val authConfig : AuthConfig = validateAndCreateAuthConfig(config, database, collection)
val connectionPolicySettings : ConnectionPolicySettings = createConnectionPolicySettings(config)
+ val countLoggingPath = config.get(CosmosDBConfig.CountLoggingPath)
+ val queryLoggingPath = config.get(CosmosDBConfig.QueryLoggingPath)
+ val queryLoggingCorrelationId = config.get(CosmosDBConfig.QueryLoggingCorrelationId)
val bulkExecutorSettings : BulkExecutorSettings = createBulkExecutorSettings(config)
// Get consistency level
@@ -81,7 +96,12 @@ object ClientConfiguration extends CosmosDBLoggingTrait {
consistencyLevel,
database,
collection,
- bulkExecutorSettings)
+ bulkExecutorSettings,
+ countLoggingPath,
+ queryLoggingPath,
+ queryLoggingCorrelationId,
+ hadoopConfig
+ )
}
private def validateAndCreateAuthConfig(config: Config, database: String, collection: String) : AuthConfig = {
@@ -110,9 +130,16 @@ object ClientConfiguration extends CosmosDBLoggingTrait {
val maxMiniBatchUpdateCount: Int = config
.getOrElse(CosmosDBConfig.MaxMiniBatchUpdateCount, CosmosDBConfig.DefaultMaxMiniBatchUpdateCount)
+ val bulkLoggingPath = config.get(CosmosDBConfig.BulkLoggingPath)
+ val countLoggingPath = config.get(CosmosDBConfig.CountLoggingPath)
+ val bulkLoggingCorrelationId = config.get(CosmosDBConfig.BulkLoggingCorrelationId)
+
BulkExecutorSettings(
maxMiniBatchUpdateCount,
- pkDef)
+ pkDef,
+ countLoggingPath,
+ bulkLoggingPath,
+ bulkLoggingCorrelationId)
}
private def createConnectionPolicySettings(config: Config) : ConnectionPolicySettings = {
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala
index ca0e9ffe..c059221e 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala
@@ -23,6 +23,6 @@
package com.microsoft.azure.cosmosdb.spark
object Constants {
- val currentVersion = "2.4.0_2.11-3.6.9"
+ val currentVersion = "2.4.0_2.11-3.6.17-SNAPSHOT"
val userAgentSuffix = s" SparkConnector/$currentVersion"
}
\ No newline at end of file
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala
index bbd4243e..1911c932 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala
@@ -23,6 +23,7 @@
package com.microsoft.azure.cosmosdb.spark
import java.net.SocketTimeoutException
+import java.util.concurrent.Callable
import com.microsoft.azure.cosmosdb.spark.config._
import com.microsoft.azure.documentdb._
@@ -34,6 +35,9 @@ import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.util.control.Breaks._
import com.microsoft.azure.cosmosdb.rx.internal.NotFoundException
+import org.apache.hadoop.conf.Configuration
+
+import scala.collection.mutable
private object CosmosDBConnection {
private val rnd = scala.util.Random
@@ -43,10 +47,10 @@ private object CosmosDBConnection {
}
}
-private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLoggingTrait with Serializable {
+private[spark] case class CosmosDBConnection(config: Config, hadoopConfig: mutable.Map[String, String]) extends CosmosDBLoggingTrait with Serializable {
private val maxPagesPerBatch =
config.getOrElse[String](CosmosDBConfig.ChangeFeedMaxPagesPerBatch, CosmosDBConfig.DefaultChangeFeedMaxPagesPerBatch.toString).toInt
- private val clientConfig = ClientConfiguration(config)
+ val clientConfig = ClientConfiguration(config, hadoopConfig)
def getCollectionLink: String = {
executeWithRetryOnCollectionRecreate(
@@ -58,6 +62,11 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
CosmosDBConnectionCache.reinitializeClient(clientConfig)
}
+ def flushLogWriter = {
+ val documentClient = CosmosDBConnectionCache.getOrCreateClient(clientConfig)
+ documentClient.flushLogWriter()
+ }
+
private def getAllPartitionsInternal: List[PartitionKeyRange] = {
val documentClient = CosmosDBConnectionCache.getOrCreateClient(clientConfig)
val ranges = documentClient.readPartitionKeyRanges(getCollectionLink, null.asInstanceOf[FeedOptions])
@@ -232,7 +241,8 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
logDebug(s"CosmosDBConnection.getIteratorFromFeedResponse -- With continuation - returning query iterator")
val responseIterator:Iterator[T] = response
.getQueryIterator
- responseIterator
+
+ responseIterator
}
}
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala
index 98501a23..bdf59695 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala
@@ -23,14 +23,19 @@
package com.microsoft.azure.cosmosdb.spark
import java.util.concurrent.ConcurrentHashMap
-import java.util.{Timer, TimerTask}
+import java.util.{Timer, TimerTask, UUID}
import com.microsoft.azure.cosmosdb.spark.config.CosmosDBConfig
+import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
import com.microsoft.azure.documentdb._
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.SparkSession
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration
import scala.collection.JavaConversions._
+import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
@@ -68,7 +73,9 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait {
private val rnd = scala.util.Random
private val refreshDelay : Long = (10 * 60 * 1000) + rnd.nextInt(5 * 60 * 1000) // in 10 - 15 minutes
+ //private val refreshDelay : Long = (1 * 60 * 1000) + rnd.nextInt(1 * 60 * 1000) // in 10 - 15 minutes
private val refreshPeriod : Long = 15 * 60 * 1000 // every 15 minutes
+ //private val refreshPeriod : Long = 2 * 60 * 1000 // every 15 minutes
// main purpose of the time is to allow bulk operations to consume
// additional throughput when more RUs are getting provisioned
private val timerName = "throughput-refresh-timer"
@@ -110,7 +117,7 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait {
val effectivelyAvailableThroughputForBulkOperations = getOrReadMaxAvailableThroughput(config)
- val builder = DocumentBulkExecutor.builder
+ var builder = DocumentBulkExecutor.builder
.from(
client,
config.database,
@@ -121,6 +128,24 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait {
.withInitializationRetryOptions(bulkExecutorInitializationRetryOptions)
.withMaxUpdateMiniBatchCount(config.bulkConfig.maxMiniBatchUpdateCount)
+ if (config.bulkConfig.bulkLoggingPath.isDefined) {
+ val logWriter = new HdfsLogWriter(
+ config.bulkConfig.bulkLoggingCorrelationId.getOrElse(UUID.randomUUID().toString),
+ config.hadoopConfig.toMap,
+ config.bulkConfig.bulkLoggingPath.get)
+
+ builder = builder.withLogWriter(logWriter)
+ }
+
+ if (config.bulkConfig.countLoggingPath.isDefined) {
+ val logWriter = new HdfsLogWriter(
+ config.bulkConfig.bulkLoggingCorrelationId.getOrElse(UUID.randomUUID().toString),
+ config.hadoopConfig.toMap,
+ config.bulkConfig.countLoggingPath.get)
+
+ builder = builder.withCountLogWriter(logWriter)
+ }
+
// Instantiate DocumentBulkExecutor
val bulkExecutor = builder.build()
@@ -246,6 +271,11 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait {
maxAvailableThroughput = None
)
+ oldClientCacheEntry.bulkExecutor match {
+ case Some(bulkExecutor) => bulkExecutor.flushLogs()
+ case None =>
+ }
+
logInfo(s"$timerName: ClientConfiguration#${config.hashCode} has been reset - new " +
s"${newClientCacheEntry.getLogMessage}, previously ${oldClientCacheEntry.getLogMessage}")
@@ -418,12 +448,39 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait {
val consistencyLevel = ConsistencyLevel.valueOf(config.consistencyLevel)
lastConsistencyLevel = Some(consistencyLevel)
- new DocumentClient(
+ var client = new DocumentClient(
config.host,
config.authConfig.authKey,
lastConnectionPolicy,
consistencyLevel
)
+
+ client = config.getQueryLoggingPath() match {
+ case Some(path) => {
+ val logger = new HdfsLogWriter(
+ config.queryLoggingCorrelationId.getOrElse(""),
+ config.hadoopConfig.toMap,
+ path)
+
+ client.setLogWriter(logger);
+ }
+ case None => client
+ }
+
+ client = config.getCountLoggingPath() match {
+ case Some(path) => {
+ val logger = new HdfsLogWriter(
+ config.queryLoggingCorrelationId.getOrElse(""),
+ config.hadoopConfig.toMap,
+ path)
+
+ client.setCountLogWriter(logger);
+ }
+ case None => client
+ }
+
+ client
+
}
private def createConnectionPolicy(settings: ConnectionPolicySettings): ConnectionPolicy = {
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
index 30127fbb..b7d90b06 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
@@ -24,18 +24,18 @@ package com.microsoft.azure.cosmosdb.spark
import java.io.PrintWriter
import java.io.StringWriter
+import java.lang.management.ManagementFactory
import java.nio.charset.Charset
import java.util.UUID
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
import com.microsoft.azure.cosmosdb.spark.config._
import com.microsoft.azure.cosmosdb.spark.rdd.{CosmosDBRDD, _}
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
-import rx.Observable
import com.microsoft.azure.documentdb._
import com.microsoft.azure.documentdb.bulkexecutor.{BulkImportResponse, BulkUpdateResponse, DocumentBulkExecutor, UpdateItem}
-import org.apache.spark.{Partition, SparkContext}
+import org.apache.spark.SparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
@@ -144,6 +144,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
*/
def save[D: ClassTag](rdd: RDD[D], writeConfig: Config): Unit = {
var numPartitions = 0
+ val hadoopConfig = HdfsUtils.getConfigurationMap(rdd.sparkContext.hadoopConfiguration)
try {
numPartitions = rdd.getNumPartitions
} catch {
@@ -178,7 +179,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
// In this case, users can set maxIngestionTaskParallelism to 32 and will help with the RU consumption based on writeThroughputBudget.
if (maxIngestionTaskParallelism.exists(_ > 0)) numPartitions = maxIngestionTaskParallelism.get
- val cosmosPartitionsCount = CosmosDBConnection(writeConfig).getAllPartitions.length
+ val cosmosPartitionsCount = CosmosDBConnection(writeConfig, hadoopConfig).getAllPartitions.length
// writeThroughputBudget per cosmos db physical partition
writeThroughputBudgetPerCosmosPartition = Some((writeThroughputBudget.get / cosmosPartitionsCount).ceil.toInt)
val baseMiniBatchSizeAdjustmentFactor: Double = (baseMiniBatchRUConsumption.toDouble * numPartitions) / writeThroughputBudgetPerCosmosPartition.get
@@ -192,8 +193,68 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
}
}
- val mapRdd = rdd.coalesce(numPartitions).mapPartitions(savePartition(_, writeConfig, numPartitions,
- baseMaxMiniBatchImportSizeKB * 1024, writeThroughputBudgetPerCosmosPartition), preservesPartitioning = true)
+ val connection: CosmosDBConnection = CosmosDBConnection(writeConfig, hadoopConfig)
+ val cosmosDBRowConverter = new CosmosDBRowConverter(SerializationConfig.fromConfig(connection.config))
+ val iteratorLoggingPath = writeConfig.get[String](CosmosDBConfig.IteratorLoggingPath)
+ val countLoggingPath = writeConfig.get[String](CosmosDBConfig.CountLoggingPath)
+ val iteratorLoggingCorrelationId = writeConfig.get[String](CosmosDBConfig.IteratorLoggingCorrelationId)
+ val rootPropertyToSave = writeConfig.get[String](CosmosDBConfig.RootPropertyToSave)
+ val applicationName: String = writeConfig.getOrElse[String](CosmosDBConfig.ApplicationName, "")
+ val pkDefinition = connection.getPartitionKeyDefinition
+ val userAgentString: String = if (applicationName.isEmpty) {
+ s"${Constants.userAgentSuffix} ${ManagementFactory.getRuntimeMXBean.getName}"
+ } else {
+ s"${Constants.userAgentSuffix} ${ManagementFactory.getRuntimeMXBean.getName} $applicationName"
+ }
+ val mapRdd = rdd.coalesce(numPartitions).mapPartitions(partitionedIterator => {
+ val partitionedCosmosDBRowConverter = new CosmosDBRowConverter(SerializationConfig.fromConfig(connection.config))
+ val partitionedWriter = iteratorLoggingPath match {
+ case Some(path) => Some(new HdfsLogWriter(
+ iteratorLoggingCorrelationId.getOrElse(""),
+ hadoopConfig.toMap,
+ path))
+ case None => None
+ }
+
+ val partitionedCountWriter = countLoggingPath match {
+ case Some(path) => Some(new HdfsLogWriter(
+ iteratorLoggingCorrelationId.getOrElse(""),
+ hadoopConfig.toMap,
+ path))
+ case None => None
+ }
+
+ var iterationLogger : Option[IteratorLogger] = None
+ if (partitionedWriter.isDefined) {
+ iterationLogger = Some(new IteratorLogger(partitionedWriter.get, userAgentString, "n/a", "partitionedIterator"))
+ }
+
+ val counter = new AtomicLong(0)
+ val effectiveIterator = LoggingIterator.createLoggingAndConvertingIterator(
+ partitionedIterator,
+ counter,
+ iterationLogger,
+ pkDefinition,
+ rootPropertyToSave,
+ partitionedCosmosDBRowConverter
+ )
+
+ val returnValue = savePartition(effectiveIterator, writeConfig, hadoopConfig, numPartitions,
+ baseMaxMiniBatchImportSizeKB * 1024, writeThroughputBudgetPerCosmosPartition)
+
+ if (partitionedCountWriter.isDefined) {
+ val countLogger = new CosmosCountLogger(partitionedCountWriter.get)
+ countLogger.logCount("WithinMapPartitions", "I", counter.get(), "", "")
+ partitionedCountWriter.get.flush()
+ }
+
+ if (partitionedWriter.isDefined) {
+ iterationLogger.get.flush()
+ partitionedWriter.get.flush()
+ }
+
+ returnValue
+ }, true)
mapRdd.collect()
}
@@ -381,6 +442,8 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
throw toFailedImportException(bulkImportResponse, connection)
}
}
+
+ importer.flushLogs()
}
private def toFailedImportException(response: BulkImportResponse, connection: CosmosDBConnection) : Exception = {
@@ -441,10 +504,11 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
private def savePartition[D: ClassTag](iter: Iterator[D],
config: Config,
+ hadoopConfig: mutable.Map[String, String],
partitionCount: Int,
baseMaxMiniBatchImportSize: Int,
writeThroughputBudgetPerCosmosPartition: Option[Int]): Iterator[D] = {
- val connection: CosmosDBConnection = CosmosDBConnection(config)
+ val connection: CosmosDBConnection = CosmosDBConnection(config, hadoopConfig)
val asyncConnection: AsyncCosmosDBConnection = new AsyncCosmosDBConnection(config)
val isBulkImporting = config.get[String](CosmosDBConfig.BulkImport).
@@ -745,7 +809,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
/**
* The CosmosDBSpark class
*
- * '''Note:''' Creation of the class should be via [[CosmosDBSpark$.Builder]].
+ * '''Note:''' Creation of the class should be via [[CosmosDBSpark Builder]].
*
* @since 1.0
*/
@@ -757,7 +821,6 @@ case class CosmosDBSpark(sparkSession: SparkSession, readConfig: Config) {
/**
* Creates a `RDD` for the collection
*
- * @tparam D the datatype for the collection
* @return a CosmosDBRDD[D]
*/
def toRDD: CosmosDBRDD = rdd
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala
index b86b5c31..921d2341 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala
@@ -24,6 +24,7 @@ package com.microsoft.azure.cosmosdb.spark
import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig}
import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRelation
+import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
@@ -62,7 +63,7 @@ class DefaultSource extends RelationProvider
data: DataFrame): BaseRelation = {
val config: Config = Config(sqlContext.sparkContext.getConf, parameters)
- val connection: CosmosDBConnection = CosmosDBConnection(config)
+ val connection: CosmosDBConnection = CosmosDBConnection(config, HdfsUtils.getConfigurationMap(sqlContext.sparkSession.sparkContext.hadoopConfiguration))
val isEmptyCollection: Boolean = connection.isDocumentCollectionEmpty
mode match{
case Append =>
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala
new file mode 100644
index 00000000..857d07ce
--- /dev/null
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala
@@ -0,0 +1,144 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.cosmosdb.spark
+
+import java.io.Closeable
+import java.util.{Timer, TimerTask, UUID}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
+import com.microsoft.azure.documentdb.CosmosLogWriter
+import org.apache.spark.SparkEnv
+import org.joda.time.Instant
+
+import scala.collection.concurrent.TrieMap
+import scala.util.Properties
+
+private object HdfsLogWriter extends CosmosDBLoggingTrait {
+ private val timerName = "hdfsLogWriter-cleanup-Timer"
+ private val timer: Timer = new Timer(timerName, true)
+ private val cleanupIntervalInMs = 60000
+ private val writerCount = new AtomicInteger(0)
+ val targetedMemoryBufferSizeInBytes = 50000000
+
+ val lineSeparator = Properties.lineSeparator
+ val logWriters = new TrieMap[String, HdfsLogWriter]
+
+ def registerWriter(writer: HdfsLogWriter): Unit = {
+ logWriters.put(writer.id, writer) match {
+ case Some(existingWriter) =>
+ throw new IllegalStateException(s"Already a writer '${writer.id}' registered.'")
+ case None => if (writerCount.incrementAndGet() == 1) {
+ startCleanupTimer()
+ }
+ }
+ }
+
+ def deregisterWriter(writer: HdfsLogWriter): Unit = {
+ logWriters.remove(writer.loggingLocation)
+ }
+
+ private def startCleanupTimer() : Unit = {
+ logInfo(s"$timerName: scheduling timer - delay: $cleanupIntervalInMs ms, period: $cleanupIntervalInMs ms")
+ timer.schedule(
+ new TimerTask { def run(): Unit = onCleanup() },
+ cleanupIntervalInMs,
+ cleanupIntervalInMs)
+ }
+
+ private def onCleanup() : Unit = {
+ logInfo(s"$timerName: onCleanup")
+ val snapshot = logWriters.readOnlySnapshot()
+ val threshold = Instant.now().getMillis - cleanupIntervalInMs
+ snapshot.foreach(writerHolder => {
+ val lastFlushed = writerHolder._2.lastFlushed.get()
+ if (lastFlushed > 0 && lastFlushed < threshold && writerHolder._2.hasData) {
+ writerHolder._2.flush()
+ }
+ })
+ }
+}
+
+private case class HdfsLogWriter
+(
+ correlationId: String,
+ configMap: Map[String, String],
+ loggingLocation: String
+) extends CosmosLogWriter with Closeable with CosmosDBLoggingTrait {
+
+ private[this] val inMemoryLock = ""
+ val executorId: String = SparkEnv.get.executorId
+ private[this] val fileId = new AtomicInteger(0)
+ private[this] val sb: StringBuilder = new StringBuilder()
+ private[this] lazy val hdfsUtils = new HdfsUtils(configMap, loggingLocation)
+ val lastFlushed = new AtomicLong(-1)
+
+ val id = s"${correlationId}_${executorId}_${loggingLocation}_${UUID.randomUUID()}"
+ HdfsLogWriter.registerWriter(this)
+ logInfo("HdfsBulkLogWriter instantiated.")
+
+ override def writeLine(line: String): Unit = {
+ if (line != null) {
+ val prettyLine = line.filter(_ >= ' ') + HdfsLogWriter.lineSeparator
+ logDebug(s"PrettyLine: $prettyLine")
+ this.inMemoryLock.synchronized {
+ if (sb.length + prettyLine.length >= HdfsLogWriter.targetedMemoryBufferSizeInBytes) {
+ this.flush()
+ }
+
+ this.sb.append(prettyLine)
+ }
+ }
+ }
+
+ override def flush(): Unit = {
+ logInfo(s"Flush: ${sb.size}")
+ var contentToFlush: Option[String] = None
+ this.inMemoryLock.synchronized {
+ if (this.sb.size > 0) {
+ contentToFlush = Some(this.sb.toString())
+ this.sb.clear()
+ }
+ }
+
+ contentToFlush match {
+ case Some(content) => {
+ val fileName = s"${correlationId}_${executorId}_${this.fileId.incrementAndGet()}_${UUID.randomUUID().toString()}.log"
+ logInfo(s"WriteLogFile: ${fileName} - ${content.length} bytes")
+ hdfsUtils.writeLogFile(this.loggingLocation, fileName, content)
+ lastFlushed.set(Instant.now().getMillis)
+ }
+ case None =>
+ }
+ }
+
+ def hasData = {
+ this.sb.length > 0
+ }
+
+ override def close(): Unit = {
+ logInfo("Close")
+ this.flush
+ HdfsLogWriter.deregisterWriter(this)
+ }
+}
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/LoggingIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/LoggingIterator.scala
new file mode 100644
index 00000000..afa13b6b
--- /dev/null
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/LoggingIterator.scala
@@ -0,0 +1,107 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.cosmosdb.spark
+
+import java.util.concurrent.atomic.AtomicLong
+
+import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRowConverter
+import com.microsoft.azure.documentdb.{Document, PartitionKeyDefinition}
+import org.apache.spark.sql.Row
+
+import scala.reflect.ClassTag
+
+private[spark] object LoggingIterator {
+ def createLoggingIterator[D: ClassTag]
+ (
+ inner: Iterator[D],
+ logger: IteratorLogger,
+ partitionKeyDefinition: PartitionKeyDefinition,
+ rootPropertyToSave: Option[String],
+ cosmosDBRowConverter: CosmosDBRowConverter): Iterator[D] = {
+
+ inner.map(input => {
+
+ try {
+ val document: Document = input match {
+ case doc: Document => doc
+ case row: Row =>
+ if (rootPropertyToSave.isDefined) {
+ new Document(row.getString(row.fieldIndex(rootPropertyToSave.get)))
+ } else {
+ new Document(cosmosDBRowConverter.rowToJSONObject(row).toString())
+ }
+ case any => new Document(any.toString)
+ }
+
+ logger.onIteratorNext(document, partitionKeyDefinition)
+ input
+ } catch {
+ case t: Throwable => {
+ logger.logError("Failure converting RDD item", t)
+ throw t
+ }
+ }
+ })
+ }
+
+ def createLoggingAndConvertingIterator[D: ClassTag]
+ (
+ inner: Iterator[D],
+ counter: AtomicLong,
+ logger: Option[IteratorLogger],
+ partitionKeyDefinition: PartitionKeyDefinition,
+ rootPropertyToSave: Option[String],
+ cosmosDBRowConverter: CosmosDBRowConverter): Iterator[Document] = {
+
+ inner.map(input => {
+
+ try {
+ val document: Document = input match {
+ case doc: Document => doc
+ case row: Row =>
+ if (rootPropertyToSave.isDefined) {
+ new Document(row.getString(row.fieldIndex(rootPropertyToSave.get)))
+ } else {
+ new Document(cosmosDBRowConverter.rowToJSONObject(row).toString())
+ }
+ case any => new Document(any.toString)
+ }
+
+ if (logger.isDefined) {
+ logger.get.onIteratorNext(document, partitionKeyDefinition)
+ }
+ counter.incrementAndGet()
+ document
+ } catch {
+ case t: Throwable => {
+ if (logger.isDefined) {
+ logger.get.logError("Failure converting RDD item", t)
+ }
+ throw t
+ }
+ }
+ })
+ }
+}
+
+
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala
index 8891815f..19f07e1c 100755
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala
@@ -95,7 +95,15 @@ object CosmosDBConfig {
val DefaultMaxTransientRetryDelayInMs = 100 // 0.1 second
val DefaultPoisonMessageLocation = ""
val DefaultTreatUnknownExceptionsAsTransient = true
-
+
+ val BulkLoggingPath = "bulkLoggingPath"
+ val BulkLoggingCorrelationId = "bulkLoggingCorrelationId"
+ val QueryLoggingPath = "queryLoggingPath"
+ val QueryLoggingCorrelationId = "queryLoggingCorrelationId"
+ val IteratorLoggingPath = "iteratorLoggingPath"
+ val CountLoggingPath = "countLoggingPath"
+ val IteratorLoggingCorrelationId = "iteratorLoggingCorrelationId"
+
// Not a config, constant
val StreamingTimestampToken = "tsToken"
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala
index dad76eff..bb5193a7 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala
@@ -26,19 +26,20 @@ import com.microsoft.azure.cosmosdb.spark.config._
import com.microsoft.azure.cosmosdb.spark.schema.FilterConverter
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
import com.microsoft.azure.cosmosdb.spark.{CosmosDBConnection, CosmosDBLoggingTrait}
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.Partition
import org.apache.spark.sql.sources.Filter
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
-class CosmosDBPartitioner() extends Partitioner[Partition] with CosmosDBLoggingTrait {
+class CosmosDBPartitioner(hadoopConfig: mutable.Map[String, String]) extends Partitioner[Partition] with CosmosDBLoggingTrait {
/**
* @param config Partition configuration
*/
override def computePartitions(config: Config): Array[Partition] = {
- val connection: CosmosDBConnection = CosmosDBConnection(config)
+ val connection: CosmosDBConnection = CosmosDBConnection(config, hadoopConfig)
val partitionKeyRanges = connection.getAllPartitions
logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions")
Array.tabulate(partitionKeyRanges.length){
@@ -47,7 +48,7 @@ class CosmosDBPartitioner() extends Partitioner[Partition] with CosmosDBLoggingT
}
def computePartitions(config: Config, requiredColumns: Array[String] = Array()): Array[Partition] = {
- val connection: CosmosDBConnection = CosmosDBConnection(config)
+ val connection: CosmosDBConnection = CosmosDBConnection(config, hadoopConfig)
connection.reinitializeClient()
// CosmosDB source
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala
index 63f0db55..09cf0a6a 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala
@@ -25,7 +25,7 @@ package com.microsoft.azure.cosmosdb.spark.rdd
import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig}
import com.microsoft.azure.cosmosdb.spark.partitioner.{CosmosDBPartition, CosmosDBPartitioner}
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
-import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
+import com.microsoft.azure.cosmosdb.spark.{CosmosDBConnectionCache, CosmosDBSpark}
import com.microsoft.azure.documentdb._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.Filter
@@ -40,7 +40,7 @@ class CosmosDBRDD(
spark: SparkSession,
config: Config,
maxItems: Option[Long] = None,
- partitioner: CosmosDBPartitioner = new CosmosDBPartitioner(),
+ partitionerRaw: CosmosDBPartitioner = null,
requiredColumns: Array[String] = Array(),
filters: Array[Filter] = Array())
extends RDD[Document](spark.sparkContext, deps = Nil) {
@@ -49,6 +49,7 @@ class CosmosDBRDD(
// It's a Map because Configuration is not serializable
private val hadoopConfig: mutable.Map[String, String] = HdfsUtils.getConfigurationMap(sparkContext.hadoopConfiguration)
+ private val effectivePartitioner: CosmosDBPartitioner = Option.apply(partitionerRaw).getOrElse(new CosmosDBPartitioner(hadoopConfig))
private def cosmosDBSpark = {
CosmosDBSpark(spark, config)
@@ -57,7 +58,7 @@ class CosmosDBRDD(
override def toJavaRDD(): JavaCosmosDBRDD = JavaCosmosDBRDD(this)
override def getPartitions: Array[Partition] = {
- partitioner.computePartitions(config)
+ effectivePartitioner.computePartitions(config)
}
/**
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
index 7906ab71..c78edc8a 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
@@ -53,12 +53,14 @@ object CosmosDBRDDIterator {
// For verification purpose
var lastFeedOptions: FeedOptions = _
var hdfsUtils: HdfsUtils = _
+ var hadoopConfig: mutable.Map[String, String] = _
def initializeHdfsUtils(hadoopConfig: Map[String, String], changeFeedCheckpointLocation: String): Any = {
if (hdfsUtils == null) {
this.synchronized {
if (hdfsUtils == null) {
hdfsUtils = HdfsUtils(hadoopConfig, changeFeedCheckpointLocation)
+ this.hadoopConfig = collection.mutable.Map(hadoopConfig.toSeq: _*)
}
}
}
@@ -79,7 +81,7 @@ object CosmosDBRDDIterator {
* @return the corresponding global continuation token
*/
def getCollectionTokens(config: Config, shouldGetCurrentToken: Boolean = false): String = {
- val connection = CosmosDBConnection(config)
+ val connection = CosmosDBConnection(config, this.hadoopConfig)
val collectionLink = connection.getCollectionLink
val queryName = config
.get[String](CosmosDBConfig.ChangeFeedQueryName).get
@@ -155,9 +157,15 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
private val maxRetryCountOnServiceUnavailable: Int = 100
private val rnd = scala.util.Random
+ logInfo(s"CosmosDBRDDIterator initialized for PK range id ${partition.partitionKeyRangeId}")
+
lazy val reader: Iterator[Document] = {
initialized = true
- val connection: CosmosDBConnection = CosmosDBConnection(config)
+ val connection: CosmosDBConnection = CosmosDBConnection(config, this.hadoopConfig)
+ taskContext.addTaskCompletionListener((ctx: TaskContext) => {
+ logInfo(s"CosmosDBRDDIterator: Flushing LogWriter after completing task for partition key range id ${partition.partitionKeyRangeId}")
+ connection.flushLogWriter
+ })
val readingChangeFeed: Boolean = config
.get[String](CosmosDBConfig.ReadChangeFeed)
@@ -447,6 +455,7 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
})
if (!readingChangeFeed) {
+ logInfo(s"--> query document for pk range id ${partition.partitionKeyRangeId}")
queryDocuments
} else {
readChangeFeed
@@ -486,6 +495,8 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
}
itemCount = itemCount + 1
reader.next()
+
+ // TODO @fabianm AddLog
}
def closeIfNeeded(): Unit = {
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBWriteStreamRetryPolicy.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBWriteStreamRetryPolicy.scala
index 766f34aa..aa95d7fc 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBWriteStreamRetryPolicy.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBWriteStreamRetryPolicy.scala
@@ -111,7 +111,7 @@ class CosmosDBWriteStreamRetryPolicy(configMap: Map[String, String])
requestOptions,
task,
this.config.isTransient,
- loggingAction = (msg: String) => logDebug(msg),
+ (msg: String) => logDebug(msg),
(throwable: Throwable, document: Document) => this.notificationHandler.onPoisonMessage(throwable, document),
this.rnd,
maxRetries,
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala
index 9f713929..ee53c1ec 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala
@@ -22,13 +22,13 @@
*/
package com.microsoft.azure.cosmosdb.spark.util
-import java.io.{FileNotFoundException, PrintWriter, StringWriter}
+import java.io.{BufferedOutputStream, FileNotFoundException, OutputStream, PrintWriter, StringWriter}
import java.util
import com.microsoft.azure.cosmosdb.spark.CosmosDBLoggingTrait
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
+import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, LocatedFileStatus, Path, RemoteIterator}
import scala.collection.mutable
import java.net.URI
@@ -54,6 +54,16 @@ case class HdfsUtils(configMap: Map[String, String], changeFeedCheckpointLocatio
}
}
+ def writeLogFile(base: String, filePath: String, content: String): Unit = {
+ val path = new Path(base + "/" + filePath)
+ retry(maxRetryCount) {
+ val os = fs.create(path)
+ val bos = new BufferedOutputStream(os)
+ bos.write(content.getBytes("UTF-8"))
+ bos.close()
+ }
+ }
+
def read(base: String, filePath: String, alternateQueryName: String, collectionRid: String): String = {
val path = new Path(base + "/" + filePath)
read(path, base, alternateQueryName, collectionRid)