From 629feb83f9ccd4a19424f3a76c0dca5dc1168dfe Mon Sep 17 00:00:00 2001 From: Benjamin Gaidioz Date: Wed, 19 Mar 2025 15:10:20 +0100 Subject: [PATCH 1/2] RD-15450: SOQL function --- .../das/salesforce/DASSalesforce.scala | 10 +- .../salesforce/DASSalesforceFunction.scala | 20 ++ .../DASSalesforceSOQLFunction.scala | 92 ++++++++ .../DASSalesforceSoqlFunctionTest.scala | 213 ++++++++++++++++++ 4 files changed, 332 insertions(+), 3 deletions(-) create mode 100644 src/main/scala/com/rawlabs/das/salesforce/DASSalesforceFunction.scala create mode 100644 src/main/scala/com/rawlabs/das/salesforce/DASSalesforceSOQLFunction.scala create mode 100644 src/test/scala/com/rawlabs/das/salesforce/DASSalesforceSoqlFunctionTest.scala diff --git a/src/main/scala/com/rawlabs/das/salesforce/DASSalesforce.scala b/src/main/scala/com/rawlabs/das/salesforce/DASSalesforce.scala index 1091cc5..77a3321 100644 --- a/src/main/scala/com/rawlabs/das/salesforce/DASSalesforce.scala +++ b/src/main/scala/com/rawlabs/das/salesforce/DASSalesforce.scala @@ -108,10 +108,14 @@ class DASSalesforce(options: Map[String, String]) extends DASSdk with StrictLogg override def tableDefinitions: Seq[TableDefinition] = definitions - override def functionDefinitions: Seq[FunctionDefinition] = Seq.empty - override def getTable(name: String): Option[DASTable] = allTables.find(_.tableName == name) - override def getFunction(name: String): Option[DASFunction] = None + private val allFunctions = Seq(new DASSalesforceSOQLFunction(q => connector.paginatedSOQL(q).flatten.toSeq)) + .map(f => f.definition.getFunctionId.getName -> f) + .toMap + + override def functionDefinitions: Seq[FunctionDefinition] = allFunctions.values.map(_.definition).toSeq + + override def getFunction(name: String): Option[DASFunction] = allFunctions.get(name) } diff --git a/src/main/scala/com/rawlabs/das/salesforce/DASSalesforceFunction.scala b/src/main/scala/com/rawlabs/das/salesforce/DASSalesforceFunction.scala new file mode 100644 index 0000000..ad8e9e6 --- /dev/null +++ b/src/main/scala/com/rawlabs/das/salesforce/DASSalesforceFunction.scala @@ -0,0 +1,20 @@ +/* + * Copyright 2024 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package com.rawlabs.das.salesforce + +import com.rawlabs.das.sdk.scala.DASFunction +import com.rawlabs.protocol.das.v1.functions.FunctionDefinition + +trait DASSalesforceFunction extends DASFunction { + val definition: FunctionDefinition +} diff --git a/src/main/scala/com/rawlabs/das/salesforce/DASSalesforceSOQLFunction.scala b/src/main/scala/com/rawlabs/das/salesforce/DASSalesforceSOQLFunction.scala new file mode 100644 index 0000000..a0c0e79 --- /dev/null +++ b/src/main/scala/com/rawlabs/das/salesforce/DASSalesforceSOQLFunction.scala @@ -0,0 +1,92 @@ +/* + * Copyright 2024 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package com.rawlabs.das.salesforce + +import com.rawlabs.das.sdk.DASSdkInvalidArgumentException +import com.rawlabs.protocol.das.v1.functions.{FunctionDefinition, FunctionId, ParameterDefinition} +import com.rawlabs.protocol.das.v1.types._ +import com.typesafe.scalalogging.StrictLogging + +class DASSalesforceSOQLFunction(executeSOQL: String => Iterable[Map[_, _]]) + extends DASSalesforceFunction + with StrictLogging { + + val definition: FunctionDefinition = FunctionDefinition + .newBuilder() + .setFunctionId(FunctionId.newBuilder().setName("soql")) + .setDescription("Executes a SOQL query against Salesforce.") + .addParams( + ParameterDefinition + .newBuilder() + .setName("q") + .setType(Type.newBuilder().setString(StringType.newBuilder().setNullable(false)).build()) + .build()) + .setReturnType(Type + .newBuilder() + .setList(ListType.newBuilder().setInnerType(Type.newBuilder().setRecord(RecordType.newBuilder())))) + .build() + + override def execute(args: Map[String, Value]): Value = { + args.get("q") match { + case Some(value) => + if (value.hasString) { + val soql = value.getString.getV + val items = for (map <- executeSOQL(soql)) yield { + val builder = ValueRecord.newBuilder() + map.foreach { + case (k: String, v) => + val attrValue = anyToValue(v) + builder.addAtts(ValueRecordAttr.newBuilder().setName(k).setValue(attrValue)) + case kv => logger.warn(s"Unsupported row item: ${kv.getClass}") + } + Value.newBuilder().setRecord(builder.build()).build() + } + val result = ValueList.newBuilder() + items.foreach(result.addValues) + Value.newBuilder().setList(result.build()).build() + } else { + throw new DASSdkInvalidArgumentException("Invalid parameter type for 'q'") + } + case None => throw new DASSdkInvalidArgumentException("Missing required parameter 'q'") + } + } + + private def anyToValue(value: Any): Value = { + logger.info(s"Converting value: $value") + value match { + case null => Value.newBuilder().setNull(ValueNull.newBuilder()).build() + case v: String => Value.newBuilder().setString(ValueString.newBuilder().setV(v)).build() + case v: Int => Value.newBuilder().setInt(ValueInt.newBuilder().setV(v)).build() + case v: Long => Value.newBuilder().setLong(ValueLong.newBuilder().setV(v)).build() + case v: Double => Value.newBuilder().setDouble(ValueDouble.newBuilder().setV(v)).build() + case v: Boolean => Value.newBuilder().setBool(ValueBool.newBuilder().setV(v)).build() + case v: Map[_, _] => + val builder = ValueRecord.newBuilder() + v.foreach { + case (k: String, v) => + val attrValue = anyToValue(v) + builder.addAtts(ValueRecordAttr.newBuilder().setName(k).setValue(attrValue)) + case _ => logger.warn(s"Unsupported key type: ${v.getClass}") + } + Value.newBuilder().setRecord(builder.build()).build() + case v: Iterable[_] => + val builder = ValueList.newBuilder() + v.foreach { item => + val attrValue = anyToValue(item) + builder.addValues(attrValue) + } + Value.newBuilder().setList(builder.build()).build() + case _ => throw new DASSdkInvalidArgumentException(s"Unsupported value type: ${value.getClass}") + } + } +} diff --git a/src/test/scala/com/rawlabs/das/salesforce/DASSalesforceSoqlFunctionTest.scala b/src/test/scala/com/rawlabs/das/salesforce/DASSalesforceSoqlFunctionTest.scala new file mode 100644 index 0000000..650cc6c --- /dev/null +++ b/src/test/scala/com/rawlabs/das/salesforce/DASSalesforceSoqlFunctionTest.scala @@ -0,0 +1,213 @@ +/* + * Copyright 2024 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package com.rawlabs.das.salesforce + +import scala.jdk.CollectionConverters.IterableHasAsScala + +import org.scalatest.funsuite.AnyFunSuite + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.rawlabs.das.sdk.DASSdkInvalidArgumentException +import com.rawlabs.protocol.das.v1.types.{Value, ValueInt, ValueNull, ValueString} + +class DASSalesforceSoqlFunctionTest extends AnyFunSuite { + + test("Fails gracefully if q (soql query string) is absent") { + val f = new DASSalesforceSOQLFunction(_ => Seq.empty) + assertThrows[DASSdkInvalidArgumentException] { + f.execute(Map.empty) + } + assertThrows[DASSdkInvalidArgumentException] { + f.execute( + Map("query" -> Value.newBuilder().setString(ValueString.newBuilder().setV("SELECT Id From Event")).build())) + } + } + + test("Fails gracefully if q (string) is of the wrong type") { + val f = new DASSalesforceSOQLFunction(_ => Seq.empty) + // Int + assertThrows[DASSdkInvalidArgumentException] { + f.execute(Map("q" -> Value.newBuilder().setInt(ValueInt.newBuilder().setV(14)).build())) + } + // Null + assertThrows[DASSdkInvalidArgumentException] { + f.execute(Map("q" -> Value.newBuilder().setNull(ValueNull.newBuilder()).build())) + } + } + + test("Successfully processes a couple of JSON rows returned by the Salesforce API") { + // The Salesforce output is a JSON array containing two records. + val json = jsonStringToSalesforceValue( + """[{"Id": "1", "Name": "Jane Doe"},{"Id": "2", "Name": "John Doe"},{"Id": "3", "Name": "Joe Dohn"}]""") + val f = new DASSalesforceSOQLFunction(_ => json) + val result = runWith(f, "...") // soql is not used in this test + assert(result.getList.getValuesCount == 3) + val item1 = result.getList.getValues(0) + + assert(item1.hasRecord) + assert(item1.getRecord.getAttsCount == 2) + + assert(item1.getRecord.getAtts(0).getName == "Id") + assert(item1.getRecord.getAtts(0).getValue.getString.getV == "1") + assert(item1.getRecord.getAtts(1).getName == "Name") + assert(item1.getRecord.getAtts(1).getValue.getString.getV == "Jane Doe") + + val item2 = result.getList.getValues(1) + assert(item2.hasRecord) + assert(item2.getRecord.getAttsCount == 2) + assert(item2.getRecord.getAtts(0).getName == "Id") + assert(item2.getRecord.getAtts(0).getValue.getString.getV == "2") + assert(item2.getRecord.getAtts(1).getName == "Name") + assert(item2.getRecord.getAtts(1).getValue.getString.getV == "John Doe") + + val item3 = result.getList.getValues(2) + assert(item3.hasRecord) + assert(item3.getRecord.getAttsCount == 2) + assert(item3.getRecord.getAtts(0).getName == "Id") + assert(item3.getRecord.getAtts(0).getValue.getString.getV == "3") + assert(item3.getRecord.getAtts(1).getName == "Name") + assert(item3.getRecord.getAtts(1).getValue.getString.getV == "Joe Dohn") + } + + test("Successfully parses an empty record") { + // The Salesforce output is a JSON array containing one record. Just make sure it + // doesn't throw an exception. + val json = jsonStringToSalesforceValue("[{}]") + val f = new DASSalesforceSOQLFunction(_ => json) + val result = runWith(f, "SELECT * FROM table") + assert(result.getList.getValuesCount == 1) + val item = result.getList.getValues(0) + assert(item.hasRecord) + assert(item.getRecord.getAttsCount == 0) + } + + test("Successfully parses an empty result set") { + // The Salesforce output is a JSON array containing no records. That would happen if + // the query returned no results. + val f = new DASSalesforceSOQLFunction(_ => Seq.empty) + val result = runWith(f, "SELECT * FROM table") + assert(result.getList.getValuesCount == 0) + } + + test("Successfully parses an integer field") { + val json = jsonStringToSalesforceValue("""[{"Id": 1}]""") + val f = new DASSalesforceSOQLFunction(_ => json) + val result = runWith(f, "SELECT Id FROM table") + assert(result.getList.getValuesCount == 1) + val item = result.getList.getValues(0) + assert(item.hasRecord) + assert(item.getRecord.getAttsCount == 1) + assert(item.getRecord.getAtts(0).getName == "Id") + assert(item.getRecord.getAtts(0).getValue.getInt.getV == 1) + } + + test("Successfully parses a double field") { + val json = jsonStringToSalesforceValue("""[{"Id": 3.14}]""") + val f = new DASSalesforceSOQLFunction(_ => json) + val result = runWith(f, "SELECT Id FROM table") + assert(result.getList.getValuesCount == 1) + val item = result.getList.getValues(0) + assert(item.hasRecord) + assert(item.getRecord.getAttsCount == 1) + assert(item.getRecord.getAtts(0).getName == "Id") + assert(item.getRecord.getAtts(0).getValue.getDouble.getV == 3.14) + } + + test("Successfully parses a boolean field") { + val json = jsonStringToSalesforceValue("""[{"IsDeleted": true}]""") + val f = new DASSalesforceSOQLFunction(_ => json) + val result = runWith(f, "SELECT IsDeleted FROM table") + assert(result.getList.getValuesCount == 1) + val item = result.getList.getValues(0) + assert(item.hasRecord) + assert(item.getRecord.getAttsCount == 1) + assert(item.getRecord.getAtts(0).getName == "IsDeleted") + assert(item.getRecord.getAtts(0).getValue.getBool.getV == true) + } + + test("Successfully processes an array of primitives") { + // The Salesforce output is a JSON array containing one record, + // where the field "Numbers" is itself an array of integers. + val jsonNumbers = jsonStringToSalesforceValue("""[{"Numbers": [1, 2, 3]}]""") + val f = new DASSalesforceSOQLFunction(_ => jsonNumbers) + val result = runWith(f, "SELECT value FROM table") + // Expect one row (record) + assert(result.getList.getValuesCount == 1) + val row = result.getList.getValues(0) + assert(row.hasRecord) + assert(row.getRecord.getAttsCount == 1) + val numbersField = row.getRecord.getAtts(0) + assert(numbersField.getName == "Numbers") + val numbersValue = numbersField.getValue + assert(numbersValue.hasList) + val numbersList = numbersValue.getList.getValuesList + assert(numbersList.size() == 3) + assert(numbersList.get(0).hasInt) + assert(numbersList.get(0).getInt.getV == 1) + assert(numbersList.get(1).hasInt) + assert(numbersList.get(1).getInt.getV == 2) + assert(numbersList.get(2).hasInt) + assert(numbersList.get(2).getInt.getV == 3) + } + + test("Successfully processes a single JSON object") { + // The Salesforce output is a JSON array with one object. + // The object has an "Id" and an "Address" field. + val jsonObject = + jsonStringToSalesforceValue("""[{"Id": "1", "Address": {"Street": "Jane Doe St", "City": "San Francisco"}}]""") + val f = new DASSalesforceSOQLFunction(_ => jsonObject) + val result = runWith(f, "SELECT Id, Address FROM table") + assert(result.getList.getValuesCount == 1) + val row = result.getList.getValues(0) + assert(row.hasRecord) + val record = row.getRecord + // Expect two fields: "Id" and "Address" + assert(record.getAttsCount == 2) + val idField = record.getAtts(0) + assert(idField.getName == "Id") + assert(idField.getValue.hasString) + assert(idField.getValue.getString.getV == "1") + val addressField = record.getAtts(1) + assert(addressField.getName == "Address") + assert(addressField.getValue.hasRecord) + val addressRecord = addressField.getValue.getRecord + // Expect the address record to have two fields: "Street" and "City" + assert(addressRecord.getAttsCount == 2) + val streetField = addressRecord.getAttsList.asScala + .find(_.getName == "Street") + .getOrElse(fail("Missing 'Street' field in Address")) + assert(streetField.getValue.hasString) + assert(streetField.getValue.getString.getV == "Jane Doe St") + val cityField = addressRecord.getAttsList.asScala + .find(_.getName == "City") + .getOrElse(fail("Missing 'City' field in Address")) + assert(cityField.getValue.hasString) + assert(cityField.getValue.getString.getV == "San Francisco") + } + + // A helper to wrap the query string in a Value object. + private def runWith(f: DASSalesforceSOQLFunction, soql: String): Value = { + f.execute(Map("q" -> Value.newBuilder().setString(ValueString.newBuilder().setV(soql)).build())) + } + + // jsonMapper with scala module + private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule) + + // Convert a JSON string to a Scala value. That Scala value is what we get from the + // Salesforce API. (The JSON string input argument is a convenience for the test.) + private def jsonStringToSalesforceValue(stringOutput: String) = { + jsonMapper.readValue(stringOutput, classOf[Array[Map[String, Any]]]) + } + +} From e02362cc452ae191eeab45b09a3f9dbb99a5e0fe Mon Sep 17 00:00:00 2001 From: Benjamin Gaidioz Date: Tue, 1 Apr 2025 15:40:00 +0200 Subject: [PATCH 2/2] Removing log --- .../com/rawlabs/das/salesforce/DASSalesforceSOQLFunction.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/com/rawlabs/das/salesforce/DASSalesforceSOQLFunction.scala b/src/main/scala/com/rawlabs/das/salesforce/DASSalesforceSOQLFunction.scala index a0c0e79..d5a1faa 100644 --- a/src/main/scala/com/rawlabs/das/salesforce/DASSalesforceSOQLFunction.scala +++ b/src/main/scala/com/rawlabs/das/salesforce/DASSalesforceSOQLFunction.scala @@ -62,7 +62,6 @@ class DASSalesforceSOQLFunction(executeSOQL: String => Iterable[Map[_, _]]) } private def anyToValue(value: Any): Value = { - logger.info(s"Converting value: $value") value match { case null => Value.newBuilder().setNull(ValueNull.newBuilder()).build() case v: String => Value.newBuilder().setString(ValueString.newBuilder().setV(v)).build()