Skip to content

Commit 6e0a666

Browse files
authored
#62: Implement splitPath (#64)
* `splitPath` function implemented and used * Updated UTs * `keepEmptyFields: Boolean` to `splitPath`
1 parent 435b3c8 commit 6e0a666

File tree

5 files changed

+73
-9
lines changed

5 files changed

+73
-9
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,14 @@ _SchemaUtils_ provides methods for working with schemas, its comparison and alig
7474
SchemaUtils.appendPath(path, fieldName)
7575
```
7676

77+
5. Separates the field name components of a fully qualified column name as their hierarchy goes from root down to the
78+
deepest one.
79+
80+
```scala
81+
SchemaUtils.splitPath(columnName, keepEmptyFields = True)
82+
```
83+
84+
7785
### JsonUtils
7886

7987
_Json Utils_ provides methods for working with Json, both on input and output.

spark-commons/src/main/scala/za/co/absa/spark/commons/implicits/StructTypeImplicits.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.sql.types._
2222
import za.co.absa.spark.commons.adapters.TransformAdapter
2323
import za.co.absa.spark.commons.implicits.DataTypeImplicits.DataTypeEnhancements
2424
import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldEnhancements
25+
import za.co.absa.spark.commons.utils.SchemaUtils
2526
import za.co.absa.spark.commons.utils.SchemaUtils.{getAllArraySubPaths, isCommonSubPath}
2627

2728
import scala.annotation.tailrec
@@ -102,7 +103,7 @@ object StructTypeImplicits {
102103
}
103104
}
104105

105-
val pathTokens = path.split('.').toList
106+
val pathTokens = SchemaUtils.splitPath(path)
106107
Try{
107108
examineStructField(pathTokens.tail, schema(pathTokens.head))
108109
}.getOrElse(None)
@@ -192,7 +193,7 @@ object StructTypeImplicits {
192193
* @return true if the column is the only column in a struct
193194
*/
194195
def isOnlyField(path: String): Boolean = {
195-
val pathSegments = path.split('.')
196+
val pathSegments = SchemaUtils.splitPath(path)
196197
evaluateConditionsForField(schema, pathSegments, path, applyArrayHelper = false, applyLeafCondition = true,
197198
field => field.fields.length == 1)
198199
}
@@ -330,7 +331,7 @@ object StructTypeImplicits {
330331
}
331332
}
332333

333-
val pathToks = path.split('.')
334+
val pathToks = SchemaUtils.splitPath(path)
334335
helper(pathToks, Seq()).mkString(".")
335336
}
336337

@@ -358,7 +359,7 @@ object StructTypeImplicits {
358359
}
359360
}
360361

361-
val pathToks = path.split("\\.")
362+
val pathToks = SchemaUtils.splitPath(path)
362363
helper(pathToks, Seq(), Seq())
363364
}
364365

@@ -411,7 +412,7 @@ object StructTypeImplicits {
411412
* @return true if a field is an array that is not nested in another array
412413
*/
413414
def isNonNestedArray(path: String): Boolean = {
414-
val pathSegments = path.split('.')
415+
val pathSegments = SchemaUtils.splitPath(path)
415416
evaluateConditionsForField(schema, pathSegments, path, applyArrayHelper = false)
416417
}
417418
}

spark-commons/src/main/scala/za/co/absa/spark/commons/utils/ExplodeTools.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ object ExplodeTools {
252252
}
253253

254254
val newFieldName = inputDf.schema.getClosestUniqueName(deconstructedColumnName)
255-
val resultDf = inputDf.select(processStruct(inputDf.schema, columnName.split('.'), None)
255+
val resultDf = inputDf.select(processStruct(inputDf.schema, SchemaUtils.splitPath(columnName), None)
256256
:+ col(columnName).as(newFieldName): _*)
257257
DeconstructedNestedField(resultDf, newFieldName, transientColName)
258258
}
@@ -287,7 +287,7 @@ object ExplodeTools {
287287
val newFields2 = if (isColumnToFound) newFields else newFields :+ col(columnFrom).as(columnTo)
288288
inputDf.select(newFields2: _*)
289289
} else {
290-
putFieldIntoNestedStruct(inputDf, columnFrom, columnTo.split('.'), positionColumn)
290+
putFieldIntoNestedStruct(inputDf, columnFrom, SchemaUtils.splitPath(columnTo), positionColumn)
291291
}
292292
}
293293

@@ -342,7 +342,7 @@ object ExplodeTools {
342342

343343
private def addSuperTransientField(inputDf: DataFrame, arrayColPathName: String): (DataFrame, String) = {
344344
val colName = inputDf.schema.getClosestUniqueName(superTransientColumnName)
345-
val nestedColName = (arrayColPathName.split('.').dropRight(1) :+ colName).mkString(".")
345+
val nestedColName = (SchemaUtils.splitPath(arrayColPathName).dropRight(1) :+ colName).mkString(".")
346346
val df = inputDf.nestedWithColumn(nestedColName, lit(null))
347347
(df, nestedColName)
348348
}

spark-commons/src/main/scala/za/co/absa/spark/commons/utils/SchemaUtils.scala

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ object SchemaUtils {
7474
}
7575

7676
var isParentCommon = true // For Seq() the property holds by [my] convention
77-
var restOfPaths: Seq[Seq[String]] = paths.map(_.split('.').toSeq).filter(_.nonEmpty)
77+
var restOfPaths: Seq[Seq[String]] = paths.map(SchemaUtils.splitPath).filter(_.nonEmpty)
7878
while (isParentCommon && restOfPaths.nonEmpty) {
7979
val parent = restOfPaths.head.head
8080
isParentCommon = restOfPaths.forall(path => path.head == parent)
@@ -100,6 +100,44 @@ object SchemaUtils {
100100
}
101101
}
102102

103+
104+
/**
105+
* Separates the field name components of a fully qualified column name as their hierarchy goes from root down to the
106+
* deepest one. No validation on the field names is done
107+
* Example: `"com.my.package.xyz"` -> `List("com", "my", "package", "xyz")`
108+
* Trailing '.' is ignored, leading one not.
109+
*
110+
* @param columnName A fully qualified column name
111+
* @return Each level field name in sequence how they go from root to the lowest one
112+
*/
113+
def splitPath(columnName: String): List[String] = splitPath(columnName, keepEmptyFields = true)
114+
115+
/**
116+
* Separates the field name components of a fully qualified column name as their hierarchy goes from root down to the
117+
* deepest one. No validation on the field names is done
118+
* Function is rather overloaded than using default parameter for easier use in functions like `map`
119+
* Example: `"com.my.package.xyz"` -> `List("com", "my", "package", "xyz")`
120+
* Trailing '.' is ignored, leading one not.
121+
*
122+
* @param columnName A fully qualified column name
123+
* @param keepEmptyFields If `false` any empty field names are removed from the result list, otherwise kept
124+
* @return Each level field name in sequence how they go from root to the lowest one
125+
*/
126+
def splitPath(columnName: String, keepEmptyFields: Boolean): List[String] = {
127+
val stripped = columnName.stripSuffix(".")
128+
129+
if (stripped.isEmpty) {
130+
List.empty
131+
} else {
132+
val segments = stripped.split('.').toList
133+
if (keepEmptyFields) {
134+
segments
135+
} else {
136+
segments.filterNot(_.isEmpty)
137+
}
138+
}
139+
}
140+
103141
private def columnPathAndCore(columnName: String): (String, String) = {
104142
val index = columnName.lastIndexOf('.')
105143
if (index >= 0) {

spark-commons/src/test/scala/za/co/absa/spark/commons/utils/SchemaUtilsTest.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,21 @@ class SchemaUtilsTest extends AnyFunSuite with Matchers {
4646
assert (!isCommonSubPath("a.b.c.d.e.f", "a.b.c.x", "a.b.c", "a.b", "a"))
4747
}
4848

49+
test("Test splitPath") {
50+
assertResult(List("a", "b", "c", "d", "e"))(splitPath("a.b.c.d.e"))
51+
assertResult(List("a"))(splitPath("a"))
52+
assertResult(List("a", "bcd"))(splitPath("a.bcd"))
53+
assertResult(List("a", "bcd"))(splitPath("a.bcd."))
54+
assertResult(List("", "a", "bcd"))(splitPath(".a.bcd"))
55+
assertResult(List.empty[String])(splitPath(""))
56+
assertResult(List.empty[String])(splitPath("."))
57+
}
58+
59+
test("Test splitPath with removing empty fields") {
60+
assertResult(List("a", "b", "c", "d", "e"))(splitPath("a.b.c.d.e", keepEmptyFields = false))
61+
assertResult(List("a", "e"))(splitPath("a....e", keepEmptyFields = false))
62+
assertResult(List("a", "bcd"))(splitPath(".a.bcd", keepEmptyFields = false))
63+
assertResult(List.empty[String])(splitPath("", keepEmptyFields = false))
64+
assertResult(List.empty[String])(splitPath(".", keepEmptyFields = false))
65+
}
4966
}

0 commit comments

Comments
 (0)