Skip to content

(dsl): Support IP range aggregation #650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5,803 changes: 2,937 additions & 2,866 deletions modules/integration/src/test/scala/zio/elasticsearch/HttpExecutorSpec.scala
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file still have issues with format, and also it has special ^M characters

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,45 @@ object ElasticAggregation {
final def filterAggregation(name: String, query: ElasticQuery[_]): FilterAggregation =
Filter(name = name, query = query, subAggregations = Chunk.empty)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.IpRangeAggregation]] using the specified
* parameters.
*
* @param name
* Aggregation name.
* @param field
* The field for which the IP range aggregation will be executed
* @param ranges
* A chunk of IP range bounds specifying the ranges
* @param subAggregations
* Optional map of sub-aggregations to nest within this aggregation
* @return
* An instance of [[IpRangeAggregation]] that represents filter aggregation to be performed.
*/
def ipRangeAggregation(
name: String,
field: Field[_, String],
ranges: Chunk[IpRange.IpRangeBound]
): IpRangeAggregation =
IpRange(name = name, field = field.toString, ranges = ranges, keyed = None, subAggregations = None)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.IpRangeAggregation]] using the specified parameters.
*
* @param name
* Aggregation name
* @param field
* The field (as string) for which the IP range aggregation will be executed
* @param ranges
* A chunk of IP range bounds specifying the ranges
* @param subAggregations
* Optional map of sub-aggregations to nest within this aggregation
* @return
* An instance of [[IpRangeAggregation]] configured with the provided parameters.
*/
def ipRangeAggregation(name: String, field: String, ranges: Chunk[IpRange.IpRangeBound]): IpRangeAggregation =
IpRange(name = name, field = field, ranges = ranges, keyed = None, subAggregations = None)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MaxAggregation]] using the specified parameters.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,62 @@ private[elasticsearch] final case class Filter(
}
}

sealed trait IpRangeAggregation extends SingleElasticAggregation with WithAgg with WithSubAgg[IpRangeAggregation]

final case class IpRange(
name: String,
field: String,
ranges: Chunk[IpRange.IpRangeBound],
keyed: Option[Boolean],
subAggregations: Option[Chunk[SingleElasticAggregation]]
) extends IpRangeAggregation { self =>

def keyed(value: Boolean): IpRangeAggregation = self.copy(keyed = Some(value))

def withAgg(aggregation: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, aggregation)

def withSubAgg(aggregation: SingleElasticAggregation): IpRangeAggregation =
self.copy(subAggregations = Some(aggregation +: subAggregations.getOrElse(Chunk.empty)))

private[elasticsearch] def toJson: Json = {

val rangesJson = ranges.map(_.toJson)
val keyedJson = keyed.fold(Obj())(k => Obj("keyed" -> k.toJson))
val subAggsJson = subAggregations match {
case Some(aggs) if aggs.nonEmpty =>
Obj("aggs" -> aggs.map(_.toJson).reduce(_ merge _))
case _ => Obj()
}

Obj(
name -> (
Obj("ip_range" -> (Obj("field" -> field.toJson, "ranges" -> Arr(rangesJson)) merge keyedJson)) merge subAggsJson
)
)
}
}

object IpRange {

final case class IpRangeBound(
from: Option[String] = None,
to: Option[String] = None,
mask: Option[String] = None,
key: Option[String] = None
) {
def toJson: Json = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put private[elasticsearch] here?

val baseFields = Chunk.empty[(String, Json)] ++
from.map("from" -> _.toJson) ++
to.map("to" -> _.toJson) ++
mask.map("mask" -> _.toJson) ++
key.map("key" -> _.toJson)

Obj(baseFields: _*)
}
}
}

sealed trait MaxAggregation extends SingleElasticAggregation with HasMissing[MaxAggregation] with WithAgg

private[elasticsearch] final case class Max(name: String, field: String, missing: Option[Double])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ object AggregationResponse {
(key, toResult(response))
})
)
case IpRangeAggregationResponse(buckets) =>
IpRangeAggregationResult(
buckets.map(b =>
IpRangeAggregationBucketResult(
key = b.key,
from = b.from,
to = b.to,
docCount = b.docCount,
subAggregations = Map.empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbulaja98, up there, we have withSubAggregations. Is this correct implementation then?

)
)
)
case MaxAggregationResponse(value) =>
MaxAggregationResult(value)
case MinAggregationResponse(value) =>
Expand Down Expand Up @@ -157,6 +169,8 @@ private[elasticsearch] case class BucketDecoder(fields: Chunk[(String, Json)]) e
)
case str if str.contains("filter#") =>
Some(field -> data.unsafeAs[FilterAggregationResponse](FilterAggregationResponse.decoder))
case str if str.contains("ip_range#") =>
Some(field -> data.unsafeAs[IpRangeAggregationResponse](IpRangeAggregationResponse.decoder))
case str if str.contains("max#") =>
Some(field -> MaxAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("min#") =>
Expand Down Expand Up @@ -202,6 +216,8 @@ private[elasticsearch] case class BucketDecoder(fields: Chunk[(String, Json)]) e
(field.split("#")(1), data.asInstanceOf[ExtendedStatsAggregationResponse])
case str if str.contains("filter#") =>
(field.split("#")(1), data.asInstanceOf[FilterAggregationResponse])
case str if str.contains("ip_range#") =>
(field.split("#")(1), data.asInstanceOf[IpRangeAggregationResponse])
case str if str.contains("max#") =>
(field.split("#")(1), data.asInstanceOf[MaxAggregationResponse])
case str if str.contains("min#") =>
Expand Down Expand Up @@ -285,6 +301,27 @@ private[elasticsearch] sealed trait JsonDecoderOps {
}
}

private[elasticsearch] final case class IpRangeAggregationBucket(
key: String,
from: Option[String],
to: Option[String],
@jsonField("doc_count")
docCount: Int
) extends AggregationBucket

private[elasticsearch] object IpRangeAggregationBucket {
implicit val decoder: JsonDecoder[IpRangeAggregationBucket] = DeriveJsonDecoder.gen[IpRangeAggregationBucket]
}

private[elasticsearch] final case class IpRangeAggregationResponse(
buckets: Chunk[IpRangeAggregationBucket]
) extends AggregationResponse

private[elasticsearch] object IpRangeAggregationResponse {
implicit val decoder: JsonDecoder[IpRangeAggregationResponse] =
DeriveJsonDecoder.gen[IpRangeAggregationResponse]
}

private[elasticsearch] final case class MaxAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object MaxAggregationResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
ExtendedStatsAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("filter#") =>
FilterAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("ip_range#") =>
IpRangeAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("max#") =>
MaxAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("min#") =>
Expand Down
12 changes: 12 additions & 0 deletions modules/library/src/main/scala/zio/elasticsearch/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ package object elasticsearch extends IndexNameNewtype with IndexPatternNewtype w
def asExtendedStatsAggregation(name: String): RIO[R, Option[ExtendedStatsAggregationResult]] =
aggregationAs[ExtendedStatsAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
* @param name
* the name of the aggregation to retrieve
* @return
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
* [[result.IpRangeAggregationResult]].
*/
def asIpRangeAggregation(name: String): RIO[R, Option[IpRangeAggregationResult]] =
aggregationAs[IpRangeAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,26 @@ final case class FilterAggregationResult private[elasticsearch] (
}
}

final case class IpRangeAggregationResult private[elasticsearch] (
buckets: Chunk[IpRangeAggregationBucketResult]
) extends AggregationResult

final case class IpRangeAggregationBucketResult private[elasticsearch] (
key: String,
from: Option[String],
to: Option[String],
docCount: Int,
subAggregations: Map[String, AggregationResult]
) extends AggregationResult {

def subAggregationAs[A <: AggregationResult](aggName: String): Either[DecodingException, Option[A]] =
subAggregations.get(aggName) match {
case Some(agg: A) => Right(Some(agg))
case Some(_) => Left(DecodingException(s"Aggregation with name $aggName was not of type you provided."))
case None => Right(None)
}
}

final case class MaxAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class MinAggregationResult private[elasticsearch] (value: Double) extends AggregationResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,32 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
equalTo(ExtendedStats(name = "aggregation", field = "intField", missing = Some(20.0), sigma = Some(3.0)))
)
},
test("ipRange") {
val aggregation =
ipRangeAggregation(
name = "ip_range_agg",
field = "ipField",
ranges = Chunk(
IpRange.IpRangeBound(to = Some("10.0.0.5")),
IpRange.IpRangeBound(from = Some("10.0.0.5"))
)
)

assert(aggregation)(
equalTo(
IpRange(
name = "ip_range_agg",
field = "ipField",
ranges = Chunk(
IpRange.IpRangeBound(to = Some("10.0.0.5")),
IpRange.IpRangeBound(from = Some("10.0.0.5"))
),
keyed = None,
subAggregations = None
)
)
)
},
test("filter") {
val query = term(TestDocument.stringField, "test")
val aggregation = filterAggregation("aggregation", query)
Expand Down Expand Up @@ -967,6 +993,73 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
assert(aggregationWithSubAggregation.toJson)(equalTo(expectedWithSubAggregation.toJson)) &&
assert(aggregationWithMultipleSubAggregations.toJson)(equalTo(expectedWithMultipleSubAggregations.toJson))
},
test("ip_range aggregation with from/to ") {
val agg = IpRange(
name = "ip_range_agg",
field = "ip",
ranges = Chunk(
IpRange.IpRangeBound(to = Some("10.0.0.5")),
IpRange.IpRangeBound(from = Some("10.0.0.5"))
),
keyed = None,
subAggregations = None
)

val expectedJson =
"""
|{
| "ip_range_agg": {
| "ip_range": {
| "field": "ip",
| "ranges": [
| {
| "to": "10.0.0.5"
| },
| {
| "from": "10.0.0.5"
| }
| ]
| }
| }
|}
|""".stripMargin

assert(agg.toJson)(equalTo(expectedJson.toJson))
},
test("ip_range aggregation with CIDR masks and keyed = true") {
val agg = IpRange(
name = "ip_range_agg",
field = "ip",
ranges = Chunk(
IpRange.IpRangeBound(mask = Some("10.0.0.0/25")),
IpRange.IpRangeBound(mask = Some("10.0.0.127/25"))
),
keyed = Some(true),
subAggregations = None
)

val expectedJson =
"""
|{
| "ip_range_agg": {
| "ip_range": {
| "field": "ip",
| "ranges": [
| {
| "mask": "10.0.0.0/25"
| },
| {
| "mask": "10.0.0.127/25"
| }
| ],
| "keyed": true
| }
| }
|}
|""".stripMargin

assert(agg.toJson)(equalTo(expectedJson.toJson))
},
test("max") {
val aggregation = maxAggregation("aggregation", "testField")
val aggregationTs = maxAggregation("aggregation", TestDocument.intField)
Expand Down
Loading