diff --git a/demo/README.md b/demo/README.md new file mode 100644 index 0000000..10600a3 --- /dev/null +++ b/demo/README.md @@ -0,0 +1,159 @@ +# Simple Inline Test Demo + +## TL;DR + +```bash +bash run.sh junit +# OR +bash run.sh assert +``` + +## Demo Structure + +This demo is made of the following files and directories: + +* `amazon-sqs-java-extended-client-lib`: The target project to conduct the demo. +* `inlinetest-1.0.jar`: Jar for ITest -- the Java inline test framework. +* `junit-platform-console-standalone-1.12.0.jar`: JUnit standalone jar to execute JUnit inline tests with. +* `run.sh`: A `bash` script to demonstrate the process of running an inline test. +* `inline-tests`: This generated directory contains generated sources and bytecode of the inline test. +* `clean.sh`: A `bash` script to clean files generated during the demo. +* `diff.txt`: Shows the changes made to the project under demonstration. Contains lines for adding inline test dependency to `pom.xml`, adding imports, and adding the inline test itself. +* `out.txt`: Generated output of running the demo. + +## Output of the script + +These are some notable files that will be generated as a result of running the script, represented in relative path from the `demo` directory: + +* `amazon-sqs-java-extended-client-lib/deps.txt`: A colon-separated list of dependencies for the project. This file is used during parsing, compiling and executing the inline test. +* `inline-tests/src/AmazonSQSExtendedClient_0Test.java`: The test class that is obtained by parsing an inline test. +* `inline-tests/bin/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient_0Test.class`: Generated bytecode from the parsed inline test class. +* `out.txt`: Generated output of running the demo. + +## Target Statement + +Inline tests are designed to provide lightweight oracles for various developer-chosen inputs and outputs for complex single statements. As of now, the ITest framework majorly supports inline tests for 4 kinds of target statements: + +* Bit manipulation +* String manipulation +* Regular expressions +* Java Stream + +In this example, on line 887-888 of the source file `amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java`, there is a string manipulation target statement: + +```java +// SQSExtendedClientConstants.S3_KEY_MARKER is "-..s3Key..-" +int secondOccurence = receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER, + receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER) + 1); +``` + +There are more complicated examples in the wild, but this can be a good case for demonstration. + +## Inline Test + +An inline test can be added below this statement: + +```java +itest().given(receiptHandle, "1FOc=-..s3Key..-y*-@T-..s3Key..-").checkEq(secondOccurence, 21); +``` + +It checks that if the variables on the right hand side of the assignment are given the values specified in the `given` clause, the value on the left hand side (`secondOccurence`) is expected to be `21`. The developer can specify multiple such inline tests below a target statement to test for different cases. + +Before adding the inline test, the method containing the target statement looks like this: + +```java +private String getOrigReceiptHandle(String receiptHandle) { + int secondOccurence = receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER, + receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER) + 1); + return receiptHandle.substring(secondOccurence + SQSExtendedClientConstants.S3_KEY_MARKER.length()); +} +``` + +After adding the inline test, the enclosing method of the target statement looks like this: + +```java +private String getOrigReceiptHandle(String receiptHandle) { + int secondOccurence = receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER, + receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER) + 1); + itest().given(receiptHandle, "1FOc=-..s3Key..-y*-@T-..s3Key..-").checkEq(secondOccurence, 21); + return receiptHandle.substring(secondOccurence + SQSExtendedClientConstants.S3_KEY_MARKER.length()); +} +``` + +In this example, the developer assigns the index of the second occurrence of `"-..s3Key..-"` in `receiptHandle` to the variable `secondOccurence`. Therefore, the inline test constructed for this example can provide different values for `receiptHandle` using the `given` clause, and check their corresponding values of `secondOccurence` with the `checkEq` clause. + +## Parsing Inline Test + +Parsing an inline test is defined as the process of using the ITest tool to turn a (target statement, inline test) pair, in this case these two lines: + +```java +int secondOccurence = receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER, + receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER) + 1); +itest().given(receiptHandle, "1FOc=-..s3Key..-y*-@T-..s3Key..-").checkEq(secondOccurence, 21); +``` + +to either: + +1. a class with a main method and uses Java's default asserts +2. or a JUnit class that uses JUnit's asserts + +This process is demonstrated in `run.sh`'s `parse_inline_test` function. + +### Assertion Style + +In the process of parsing an inline test, one can choose to parse inline into either a simple Java class, or a JUnit Java class. + +### Content of the generated JUnit class + +```java +package com.amazon.sqs.javamessaging; + +// Other imports omitted for space +import org.inlinetest.ITest; +import static org.inlinetest.ITest.itest; +import static org.inlinetest.ITest.group; + +public class AmazonSQSExtendedClient_0Test { + + @Test + void testLine889() throws Exception { + String receiptHandle = "1FOc=-..s3Key..-y*-@T-..s3Key..-"; + int secondOccurence = receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER, receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER) + 1); + assertEquals(21, secondOccurence); + } +} +``` + +### Content of the generated assertion class + +```java +package com.amazon.sqs.javamessaging; + +// Other imports omitted for space +import org.inlinetest.ITest; +import static org.inlinetest.ITest.itest; +import static org.inlinetest.ITest.group; + +public class AmazonSQSExtendedClient_0Test { + + static void testLine889() { + String receiptHandle = "1FOc=-..s3Key..-y*-@T-..s3Key..-"; + int secondOccurence = receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER, receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER) + 1); + assert Objects.equals(secondOccurence, 21); + } + + public static void main(String[] args) { + testLine889(); + System.out.println("inline tests passed: 1"); + } +} +``` + +## Compiling Inline Test + +This is just using Java compiler to compile the parsed class. Details in `run.sh`'s `compile_inline_test` function. + +## Executing Inline Test + +Depending on the assertion style parameter, one can either directly use `java` to execute the inline test class, or use JUnit standalone jar to execute the JUnit inline test class. Details in `run.sh`'s `execute_inline_test` function. + diff --git a/demo/amazon-sqs-java-extended-client-lib/LICENSE.txt b/demo/amazon-sqs-java-extended-client-lib/LICENSE.txt new file mode 100644 index 0000000..2c99318 --- /dev/null +++ b/demo/amazon-sqs-java-extended-client-lib/LICENSE.txt @@ -0,0 +1,51 @@ +Apache License +Version 2.0, January 2004 + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + +"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. + +"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. + +"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. + +"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. + +"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. + +"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. + +"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). + +"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. + +"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." + +"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: + + 1. You must give any other recipients of the Work or Derivative Works a copy of this License; and + 2. You must cause any modified files to carry prominent notices stating that You changed the files; and + 3. You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and + 4. If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. + +You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS diff --git a/demo/amazon-sqs-java-extended-client-lib/README.md b/demo/amazon-sqs-java-extended-client-lib/README.md new file mode 100644 index 0000000..a91cfc6 --- /dev/null +++ b/demo/amazon-sqs-java-extended-client-lib/README.md @@ -0,0 +1,46 @@ +Amazon SQS Extended Client Library for Java +=========================================== +The **Amazon SQS Extended Client Library for Java** enables you to manage Amazon SQS message payloads with Amazon S3. This is especially useful for storing and retrieving messages with a message payload size greater than the current SQS limit of 256 KB, up to a maximum of 2 GB. Specifically, you can use this library to: + +* Specify whether message payloads are always stored in Amazon S3 or only when a message's size exceeds 256 KB. + +* Send a message that references a single message object stored in an Amazon S3 bucket. + +* Get the corresponding message object from an Amazon S3 bucket. + +* Delete the corresponding message object from an Amazon S3 bucket. + +You can download release builds through the [releases section of this](https://github.com/awslabs/amazon-sqs-java-extended-client-lib) project. + +For more information on using the amazon-sqs-java-extended-client-lib, see our getting started guide [here](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/s3-messages.html). + +## Getting Started + +* **Sign up for AWS** -- Before you begin, you need an AWS account. For more information about creating an AWS account and retrieving your AWS credentials, see [AWS Account and Credentials](http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-setup.html) in the AWS SDK for Java Developer Guide. +* **Sign up for Amazon SQS** -- Go to the Amazon [SQS console](https://console.aws.amazon.com/sqs/home?region=us-east-1) to sign up for the service. +* **Minimum requirements** -- To use the sample application, you'll need Java 7 (or later) and [Maven 3](http://maven.apache.org/). For more information about the requirements, see the [Getting Started](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/s3-messages.html) section of the Amazon SQS Developer Guide. +* **Download** -- Download the [latest preview release](https://github.com/awslabs/amazon-sqs-java-extended-client-lib/releases) or pick it up from Maven: +### Version 2.x +```xml + + com.amazonaws + amazon-sqs-java-extended-client-lib + 2.0.2 + jar + +``` + +### Version 1.x +```xml + + com.amazonaws + amazon-sqs-java-extended-client-lib + 1.2.2 + jar + +``` +* **Further information** - Read the [API documentation](http://aws.amazon.com/documentation/sqs/). + +## Feedback +* Give us feedback [here](https://github.com/awslabs/amazon-sqs-java-extended-client-lib/issues). +* If you'd like to contribute a new feature or bug fix, we'd love to see Github pull requests from you. diff --git a/demo/amazon-sqs-java-extended-client-lib/VERSIONING.md b/demo/amazon-sqs-java-extended-client-lib/VERSIONING.md new file mode 100644 index 0000000..d8ee950 --- /dev/null +++ b/demo/amazon-sqs-java-extended-client-lib/VERSIONING.md @@ -0,0 +1,20 @@ +## Versioning Policy + +We use a three-part X.Y.Z (Major.Minor.Patch) versioning definition, as follows: +* X (Major) version changes are significant and expected to break backwards compatibility. +* Y (Minor) version changes are moderate changes. These include: + * Significant non-breaking feature additions. + * Any change to the version of a dependency. + * Possible backwards-incompatible changes. These changes will be noted and explained in detail in the release notes. +* Z (Patch) version changes are small changes. These changes will not break backwards compatibility. + * Z releases will also include warning of upcoming breaking changes, whenever possible. + +## What this means for you + +We recommend running the most recent version. Here are our suggestions for managing updates: + +* X changes will require some effort to incorporate. +* Y changes will not require significant effort to incorporate. + * If you have good unit and integration tests, these changes are generally safe to pick up automatically. +* Z changes will not require any changes to your code. Z changes are intended to be picked up automatically. + * Good unit and integration tests are always recommended. \ No newline at end of file diff --git a/demo/amazon-sqs-java-extended-client-lib/pom.xml b/demo/amazon-sqs-java-extended-client-lib/pom.xml new file mode 100644 index 0000000..53a9ce2 --- /dev/null +++ b/demo/amazon-sqs-java-extended-client-lib/pom.xml @@ -0,0 +1,190 @@ + + + 4.0.0 + + com.amazonaws + amazon-sqs-java-extended-client-lib + 2.0.2 + jar + Amazon SQS Extended Client Library for Java + An extension to the Amazon SQS client that enables sending and receiving messages up to 2GB via Amazon S3. + + https://github.com/awslabs/amazon-sqs-java-extended-client-lib/ + + + https://github.com/awslabs/amazon-sqs-java-extended-client-lib.git + + + + + Apache License, Version 2.0 + https://aws.amazon.com/apache2.0 + repo + + + + + + amazonwebservices + Amazon Web Services + https://aws.amazon.com + + developer + + + + + + 2.14.19 + + + + + software.amazon.awssdk + sqs + ${aws-java-sdk.version} + + + software.amazon.awssdk + s3 + ${aws-java-sdk.version} + + + + software.amazon.payloadoffloading + payloadoffloading-common + 2.1.2 + + + + + commons-logging + commons-logging + 1.2 + + + + junit + junit + 4.13.1 + test + + + org.mockito + mockito-core + 3.12.4 + test + + + org.inlinetest + inlinetest + 1.0 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.2 + + 1.8 + 1.8 + UTF-8 + + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.9.1 + + + attach-javadocs + + jar + + + + -Xdoclint:none + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + true + + ossrh + https://aws.oss.sonatype.org + true + + + + + + + + publishing + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + ossrh + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + + --pinentry-mode + loopback + + + + + + + + + + diff --git a/demo/amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java b/demo/amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java new file mode 100644 index 0000000..1fdd5f4 --- /dev/null +++ b/demo/amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java @@ -0,0 +1,1024 @@ +/* + * Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.sqs.javamessaging; + +import java.lang.UnsupportedOperationException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import software.amazon.awssdk.awscore.AwsRequest; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.ApiName; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.core.util.VersionInfo; +import software.amazon.awssdk.services.sqs.SqsClient; + +import software.amazon.awssdk.services.sqs.model.BatchEntryIdsNotDistinctException; +import software.amazon.awssdk.services.sqs.model.BatchRequestTooLongException; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse; +import software.amazon.awssdk.services.sqs.model.EmptyBatchRequestException; +import software.amazon.awssdk.services.sqs.model.InvalidBatchEntryIdException; +import software.amazon.awssdk.services.sqs.model.InvalidIdFormatException; +import software.amazon.awssdk.services.sqs.model.InvalidMessageContentsException; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.MessageNotInflightException; +import software.amazon.awssdk.services.sqs.model.OverLimitException; +import software.amazon.awssdk.services.sqs.model.PurgeQueueInProgressException; +import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest; +import software.amazon.awssdk.services.sqs.model.PurgeQueueResponse; +import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; +import software.amazon.awssdk.services.sqs.model.ReceiptHandleIsInvalidException; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; +import software.amazon.awssdk.services.sqs.model.SqsException; +import software.amazon.awssdk.services.sqs.model.TooManyEntriesInBatchRequestException; +import software.amazon.awssdk.utils.StringUtils; +import software.amazon.payloadoffloading.PayloadS3Pointer; +import software.amazon.payloadoffloading.PayloadStore; +import software.amazon.payloadoffloading.S3BackedPayloadStore; +import software.amazon.payloadoffloading.S3Dao; +import software.amazon.payloadoffloading.Util; + +import org.inlinetest.ITest; +import static org.inlinetest.ITest.itest; +import static org.inlinetest.ITest.group; + +/** + * Amazon SQS Extended Client extends the functionality of Amazon SQS client. + * All service calls made using this client are blocking, and will not return + * until the service call completes. + * + *

+ * The Amazon SQS extended client enables sending and receiving large messages + * via Amazon S3. You can use this library to: + *

+ * + * + */ +public class AmazonSQSExtendedClient extends AmazonSQSExtendedClientBase implements SqsClient { + static final String USER_AGENT_NAME = AmazonSQSExtendedClient.class.getSimpleName(); + static final String USER_AGENT_VERSION = VersionInfo.SDK_VERSION; + + private static final Log LOG = LogFactory.getLog(AmazonSQSExtendedClient.class); + static final String LEGACY_RESERVED_ATTRIBUTE_NAME = "SQSLargePayloadSize"; + static final List RESERVED_ATTRIBUTE_NAMES = Arrays.asList(LEGACY_RESERVED_ATTRIBUTE_NAME, + SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME); + private ExtendedClientConfiguration clientConfiguration; + private PayloadStore payloadStore; + + /** + * Constructs a new Amazon SQS extended client to invoke service methods on + * Amazon SQS with extended functionality using the specified Amazon SQS + * client object. + * + *

+ * All service calls made using this new client object are blocking, and + * will not return until the service call completes. + * + * @param sqsClient + * The Amazon SQS client to use to connect to Amazon SQS. + */ + public AmazonSQSExtendedClient(SqsClient sqsClient) { + this(sqsClient, new ExtendedClientConfiguration()); + } + + /** + * Constructs a new Amazon SQS extended client to invoke service methods on + * Amazon SQS with extended functionality using the specified Amazon SQS + * client object. + * + *

+ * All service calls made using this new client object are blocking, and + * will not return until the service call completes. + * + * @param sqsClient + * The Amazon SQS client to use to connect to Amazon SQS. + * @param extendedClientConfig + * The extended client configuration options controlling the + * functionality of this client. + */ + public AmazonSQSExtendedClient(SqsClient sqsClient, ExtendedClientConfiguration extendedClientConfig) { + super(sqsClient); + this.clientConfiguration = new ExtendedClientConfiguration(extendedClientConfig); + S3Dao s3Dao = new S3Dao(clientConfiguration.getS3Client(), + clientConfiguration.getServerSideEncryptionStrategy(), + clientConfiguration.getObjectCannedACL()); + this.payloadStore = new S3BackedPayloadStore(s3Dao, clientConfiguration.getS3BucketName()); + } + + /** + *

+ * Delivers a message to the specified queue. + *

+ * + *

+ * A message can include only XML, JSON, and unformatted text. The following Unicode characters are allowed: + *

+ *

+ * #x9 | #xA | #xD | #x20 to #xD7FF | + * #xE000 to #xFFFD | #x10000 to #x10FFFF + *

+ *

+ * Any characters not included in this list will be rejected. For more information, see the W3C specification for characters. + *

+ *
+ * + * @param sendMessageRequest + * @return Result of the SendMessage operation returned by the service. + * @throws InvalidMessageContentsException + * The message contains characters outside the allowed set. + * @throws UnsupportedOperationException + * Error code 400. Unsupported operation. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.SendMessage + * @see AWS API + * Documentation + */ + public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) { + //TODO: Clone request since it's modified in this method and will cause issues if the client reuses request object. + if (sendMessageRequest == null) { + String errorMessage = "sendMessageRequest cannot be null."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + + SendMessageRequest.Builder sendMessageRequestBuilder = sendMessageRequest.toBuilder(); + sendMessageRequest = appendUserAgent(sendMessageRequestBuilder).build(); + + if (!clientConfiguration.isPayloadSupportEnabled()) { + return super.sendMessage(sendMessageRequest); + } + + if (StringUtils.isEmpty(sendMessageRequest.messageBody())) { + String errorMessage = "messageBody cannot be null or empty."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + + //Check message attributes for ExtendedClient related constraints + checkMessageAttributes(sendMessageRequest.messageAttributes()); + + if (clientConfiguration.isAlwaysThroughS3() || isLarge(sendMessageRequest)) { + sendMessageRequest = storeMessageInS3(sendMessageRequest); + } + return super.sendMessage(sendMessageRequest); + } + + /** + *

+ * Retrieves one or more messages (up to 10), from the specified queue. Using the WaitTimeSeconds + * parameter enables long-poll support. For more information, see Amazon + * SQS Long Polling in the Amazon Simple Queue Service Developer Guide. + *

+ *

+ * Short poll is the default behavior where a weighted random set of machines is sampled on a + * ReceiveMessage call. Thus, only the messages on the sampled machines are returned. If the number of + * messages in the queue is small (fewer than 1,000), you most likely get fewer messages than you requested per + * ReceiveMessage call. If the number of messages in the queue is extremely small, you might not + * receive any messages in a particular ReceiveMessage response. If this happens, repeat the request. + *

+ *

+ * For each message returned, the response includes the following: + *

+ * + *

+ * The receipt handle is the identifier you must provide when deleting the message. For more information, see Queue and Message Identifiers in the Amazon Simple Queue Service Developer Guide. + *

+ *

+ * You can provide the VisibilityTimeout parameter in your request. The parameter is applied to the + * messages that Amazon SQS returns in the response. If you don't include the parameter, the overall visibility + * timeout for the queue is used for the returned messages. For more information, see Visibility Timeout in the Amazon Simple Queue Service Developer Guide. + *

+ *

+ * A message that isn't deleted or a message whose visibility isn't extended before the visibility timeout expires + * counts as a failed receive. Depending on the configuration of the queue, the message might be sent to the + * dead-letter queue. + *

+ * + *

+ * In the future, new attributes might be added. If you write code that calls this action, we recommend that you + * structure your code so that it can handle new attributes gracefully. + *

+ *
+ * + * @param receiveMessageRequest + * @return Result of the ReceiveMessage operation returned by the service. + * @throws OverLimitException + * The specified action violates a limit. For example, ReceiveMessage returns this error if the + * maximum number of inflight messages is reached and AddPermission returns this error if the + * maximum number of permissions for the queue is reached. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.ReceiveMessage + * @see AWS API + * Documentation + */ + public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessageRequest) { + //TODO: Clone request since it's modified in this method and will cause issues if the client reuses request object. + if (receiveMessageRequest == null) { + String errorMessage = "receiveMessageRequest cannot be null."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + + ReceiveMessageRequest.Builder receiveMessageRequestBuilder = receiveMessageRequest.toBuilder(); + appendUserAgent(receiveMessageRequestBuilder); + + if (!clientConfiguration.isPayloadSupportEnabled()) { + return super.receiveMessage(receiveMessageRequestBuilder.build()); + } + //Remove before adding to avoid any duplicates + List messageAttributeNames = new ArrayList<>(receiveMessageRequest.messageAttributeNames()); + messageAttributeNames.removeAll(RESERVED_ATTRIBUTE_NAMES); + messageAttributeNames.addAll(RESERVED_ATTRIBUTE_NAMES); + receiveMessageRequestBuilder.messageAttributeNames(messageAttributeNames); + receiveMessageRequest = receiveMessageRequestBuilder.build(); + + ReceiveMessageResponse receiveMessageResponse = super.receiveMessage(receiveMessageRequest); + ReceiveMessageResponse.Builder receiveMessageResponseBuilder = receiveMessageResponse.toBuilder(); + + List messages = receiveMessageResponse.messages(); + List modifiedMessages = new ArrayList<>(messages.size()); + for (Message message : messages) { + Message.Builder messageBuilder = message.toBuilder(); + + // for each received message check if they are stored in S3. + Optional largePayloadAttributeName = getReservedAttributeNameIfPresent(message.messageAttributes()); + if (largePayloadAttributeName.isPresent()) { + String largeMessagePointer = message.body(); + largeMessagePointer = largeMessagePointer.replace("com.amazon.sqs.javamessaging.MessageS3Pointer", "software.amazon.payloadoffloading.PayloadS3Pointer"); + + messageBuilder.body(payloadStore.getOriginalPayload(largeMessagePointer)); + + // remove the additional attribute before returning the message + // to user. + Map messageAttributes = new HashMap<>(message.messageAttributes()); + messageAttributes.keySet().removeAll(RESERVED_ATTRIBUTE_NAMES); + messageBuilder.messageAttributes(messageAttributes); + + // Embed s3 object pointer in the receipt handle. + String modifiedReceiptHandle = embedS3PointerInReceiptHandle( + message.receiptHandle(), + largeMessagePointer); + + messageBuilder.receiptHandle(modifiedReceiptHandle); + } + modifiedMessages.add(messageBuilder.build()); + } + + receiveMessageResponseBuilder.messages(modifiedMessages); + return receiveMessageResponseBuilder.build(); + } + + /** + *

+ * Deletes the specified message from the specified queue. To select the message to delete, use the + * ReceiptHandle of the message (not the MessageId which you receive when you send + * the message). Amazon SQS can delete a message from a queue even if a visibility timeout setting causes the + * message to be locked by another consumer. Amazon SQS automatically deletes messages left in a queue longer than + * the retention period configured for the queue. + *

+ * + *

+ * The ReceiptHandle is associated with a specific instance of receiving a message. If you + * receive a message more than once, the ReceiptHandle is different each time you receive a message. + * When you use the DeleteMessage action, you must provide the most recently received + * ReceiptHandle for the message (otherwise, the request succeeds, but the message might not be + * deleted). + *

+ *

+ * For standard queues, it is possible to receive a message even after you delete it. This might happen on rare + * occasions if one of the servers which stores a copy of the message is unavailable when you send the request to + * delete the message. The copy remains on the server and might be returned to you during a subsequent receive + * request. You should ensure that your application is idempotent, so that receiving a message more than once does + * not cause issues. + *

+ *
+ * + * @param deleteMessageRequest + * @return Result of the DeleteMessage operation returned by the service. + * @throws InvalidIdFormatException + * The specified receipt handle isn't valid for the current version. + * @throws ReceiptHandleIsInvalidException + * The specified receipt handle isn't valid. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.DeleteMessage + * @see AWS API + * Documentation + */ + public DeleteMessageResponse deleteMessage(DeleteMessageRequest deleteMessageRequest) { + + if (deleteMessageRequest == null) { + String errorMessage = "deleteMessageRequest cannot be null."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + + DeleteMessageRequest.Builder deleteMessageRequestBuilder = deleteMessageRequest.toBuilder(); + appendUserAgent(deleteMessageRequestBuilder); + + if (!clientConfiguration.isPayloadSupportEnabled()) { + return super.deleteMessage(deleteMessageRequestBuilder.build()); + } + + String receiptHandle = deleteMessageRequest.receiptHandle(); + String origReceiptHandle = receiptHandle; + + // Update original receipt handle if needed + if (isS3ReceiptHandle(receiptHandle)) { + origReceiptHandle = getOrigReceiptHandle(receiptHandle); + // Delete pay load from S3 if needed + if (clientConfiguration.doesCleanupS3Payload()) { + String messagePointer = getMessagePointerFromModifiedReceiptHandle(receiptHandle); + payloadStore.deleteOriginalPayload(messagePointer); + } + } + + deleteMessageRequestBuilder.receiptHandle(origReceiptHandle); + return super.deleteMessage(deleteMessageRequestBuilder.build()); + } + + /** + *

+ * Changes the visibility timeout of a specified message in a queue to a new value. The default visibility timeout + * for a message is 30 seconds. The minimum is 0 seconds. The maximum is 12 hours. For more information, see + * Visibility Timeout in the Amazon Simple Queue Service Developer Guide. + *

+ *

+ * For example, you have a message with a visibility timeout of 5 minutes. After 3 minutes, you call + * ChangeMessageVisibility with a timeout of 10 minutes. You can continue to call + * ChangeMessageVisibility to extend the visibility timeout to the maximum allowed time. If you try to + * extend the visibility timeout beyond the maximum, your request is rejected. + *

+ *

+ * An Amazon SQS message has three basic states: + *

+ *
    + *
  1. + *

    + * Sent to a queue by a producer. + *

    + *
  2. + *
  3. + *

    + * Received from the queue by a consumer. + *

    + *
  4. + *
  5. + *

    + * Deleted from the queue. + *

    + *
  6. + *
+ *

+ * A message is considered to be stored after it is sent to a queue by a producer, but not yet received from + * the queue by a consumer (that is, between states 1 and 2). There is no limit to the number of stored messages. A + * message is considered to be in flight after it is received from a queue by a consumer, but not yet deleted + * from the queue (that is, between states 2 and 3). There is a limit to the number of inflight messages. + *

+ *

+ * Limits that apply to inflight messages are unrelated to the unlimited number of stored messages. + *

+ *

+ * For most standard queues (depending on queue traffic and message backlog), there can be a maximum of + * approximately 120,000 inflight messages (received from a queue by a consumer, but not yet deleted from the + * queue). If you reach this limit, Amazon SQS returns the OverLimit error message. To avoid reaching + * the limit, you should delete messages from the queue after they're processed. You can also increase the number of + * queues you use to process your messages. To request a limit increase, file a support request. + *

+ *

+ * For FIFO queues, there can be a maximum of 20,000 inflight messages (received from a queue by a consumer, but not + * yet deleted from the queue). If you reach this limit, Amazon SQS returns no error messages. + *

+ * + *

+ * If you attempt to set the VisibilityTimeout to a value greater than the maximum time left, Amazon + * SQS returns an error. Amazon SQS doesn't automatically recalculate and increase the timeout to the maximum + * remaining time. + *

+ *

+ * Unlike with a queue, when you change the visibility timeout for a specific message the timeout value is applied + * immediately but isn't saved in memory for that message. If you don't delete a message after it is received, the + * visibility timeout for the message reverts to the original timeout value (not to the value you set using the + * ChangeMessageVisibility action) the next time the message is received. + *

+ *
+ * + * @param changeMessageVisibilityRequest + * @return Result of the ChangeMessageVisibility operation returned by the service. + * @throws MessageNotInflightException + * The specified message isn't in flight. + * @throws ReceiptHandleIsInvalidException + * The specified receipt handle isn't valid. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.ChangeMessageVisibility + * @see AWS + * API Documentation + */ + public ChangeMessageVisibilityResponse changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) + throws AwsServiceException, SdkClientException { + + ChangeMessageVisibilityRequest.Builder changeMessageVisibilityRequestBuilder = changeMessageVisibilityRequest.toBuilder(); + if (isS3ReceiptHandle(changeMessageVisibilityRequest.receiptHandle())) { + changeMessageVisibilityRequestBuilder.receiptHandle( + getOrigReceiptHandle(changeMessageVisibilityRequest.receiptHandle())); + } + return amazonSqsToBeExtended.changeMessageVisibility(changeMessageVisibilityRequestBuilder.build()); + } + + /** + *

+ * Delivers up to ten messages to the specified queue. This is a batch version of SendMessage. + * For a FIFO queue, multiple messages within a single batch are enqueued in the order they are sent. + *

+ *

+ * The result of sending each message is reported individually in the response. Because the batch request can result + * in a combination of successful and unsuccessful actions, you should check for batch errors even when the call + * returns an HTTP status code of 200. + *

+ *

+ * The maximum allowed individual message size and the maximum total payload size (the sum of the individual lengths + * of all of the batched messages) are both 256 KB (262,144 bytes). + *

+ * + *

+ * A message can include only XML, JSON, and unformatted text. The following Unicode characters are allowed: + *

+ *

+ * #x9 | #xA | #xD | #x20 to #xD7FF | + * #xE000 to #xFFFD | #x10000 to #x10FFFF + *

+ *

+ * Any characters not included in this list will be rejected. For more information, see the W3C specification for characters. + *

+ *
+ *

+ * If you don't specify the DelaySeconds parameter for an entry, Amazon SQS uses the default value for + * the queue. + *

+ *

+ * Some actions take lists of parameters. These lists are specified using the param.n notation. Values + * of n are integers starting from 1. For example, a parameter list with two elements looks like this: + *

+ *

+ * &AttributeName.1=first + *

+ *

+ * &AttributeName.2=second + *

+ * + * @param sendMessageBatchRequest + * @return Result of the SendMessageBatch operation returned by the service. + * @throws TooManyEntriesInBatchRequestException + * The batch request contains more entries than permissible. + * @throws EmptyBatchRequestException + * The batch request doesn't contain any entries. + * @throws BatchEntryIdsNotDistinctException + * Two or more batch entries in the request have the same Id. + * @throws BatchRequestTooLongException + * The length of all the messages put together is more than the limit. + * @throws InvalidBatchEntryIdException + * The Id of a batch entry in a batch request doesn't abide by the specification. + * @throws UnsupportedOperationException + * Error code 400. Unsupported operation. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.SendMessageBatch + * @see AWS API + * Documentation + */ + public SendMessageBatchResponse sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) { + + if (sendMessageBatchRequest == null) { + String errorMessage = "sendMessageBatchRequest cannot be null."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + + SendMessageBatchRequest.Builder sendMessageBatchRequestBuilder = sendMessageBatchRequest.toBuilder(); + appendUserAgent(sendMessageBatchRequestBuilder); + sendMessageBatchRequest = sendMessageBatchRequestBuilder.build(); + + if (!clientConfiguration.isPayloadSupportEnabled()) { + return super.sendMessageBatch(sendMessageBatchRequest); + } + + List batchEntries = new ArrayList<>(sendMessageBatchRequest.entries().size()); + + boolean hasS3Entries = false; + for (SendMessageBatchRequestEntry entry : sendMessageBatchRequest.entries()) { + //Check message attributes for ExtendedClient related constraints + checkMessageAttributes(entry.messageAttributes()); + + if (clientConfiguration.isAlwaysThroughS3() || isLarge(entry)) { + entry = storeMessageInS3(entry); + hasS3Entries = true; + } + batchEntries.add(entry); + } + + if (hasS3Entries) { + sendMessageBatchRequest = sendMessageBatchRequest.toBuilder().entries(batchEntries).build(); + } + + return super.sendMessageBatch(sendMessageBatchRequest); + } + + /** + *

+ * Deletes up to ten messages from the specified queue. This is a batch version of + * DeleteMessage. The result of the action on each message is reported individually in the + * response. + *

+ * + *

+ * Because the batch request can result in a combination of successful and unsuccessful actions, you should check + * for batch errors even when the call returns an HTTP status code of 200. + *

+ *
+ *

+ * Some actions take lists of parameters. These lists are specified using the param.n notation. Values + * of n are integers starting from 1. For example, a parameter list with two elements looks like this: + *

+ *

+ * &AttributeName.1=first + *

+ *

+ * &AttributeName.2=second + *

+ * + * @param deleteMessageBatchRequest + * @return Result of the DeleteMessageBatch operation returned by the service. + * @throws TooManyEntriesInBatchRequestException + * The batch request contains more entries than permissible. + * @throws EmptyBatchRequestException + * The batch request doesn't contain any entries. + * @throws BatchEntryIdsNotDistinctException + * Two or more batch entries in the request have the same Id. + * @throws InvalidBatchEntryIdException + * The Id of a batch entry in a batch request doesn't abide by the specification. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.DeleteMessageBatch + * @see AWS API + * Documentation + */ + public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) { + + if (deleteMessageBatchRequest == null) { + String errorMessage = "deleteMessageBatchRequest cannot be null."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + + DeleteMessageBatchRequest.Builder deleteMessageBatchRequestBuilder = deleteMessageBatchRequest.toBuilder(); + appendUserAgent(deleteMessageBatchRequestBuilder); + + if (!clientConfiguration.isPayloadSupportEnabled()) { + return super.deleteMessageBatch(deleteMessageBatchRequest); + } + + List entries = new ArrayList<>(deleteMessageBatchRequest.entries().size()); + for (DeleteMessageBatchRequestEntry entry : deleteMessageBatchRequest.entries()) { + DeleteMessageBatchRequestEntry.Builder entryBuilder = entry.toBuilder(); + String receiptHandle = entry.receiptHandle(); + String origReceiptHandle = receiptHandle; + + // Update original receipt handle if needed + if (isS3ReceiptHandle(receiptHandle)) { + origReceiptHandle = getOrigReceiptHandle(receiptHandle); + // Delete s3 payload if needed + if (clientConfiguration.doesCleanupS3Payload()) { + String messagePointer = getMessagePointerFromModifiedReceiptHandle(receiptHandle); + payloadStore.deleteOriginalPayload(messagePointer); + } + } + + entryBuilder.receiptHandle(origReceiptHandle); + entries.add(entryBuilder.build()); + } + + deleteMessageBatchRequestBuilder.entries(entries); + return super.deleteMessageBatch(deleteMessageBatchRequestBuilder.build()); + } + + /** + *

+ * Changes the visibility timeout of multiple messages. This is a batch version of + * ChangeMessageVisibility. The result of the action on each message is reported individually + * in the response. You can send up to 10 ChangeMessageVisibility requests with each + * ChangeMessageVisibilityBatch action. + *

+ * + *

+ * Because the batch request can result in a combination of successful and unsuccessful actions, you should check + * for batch errors even when the call returns an HTTP status code of 200. + *

+ *
+ *

+ * Some actions take lists of parameters. These lists are specified using the param.n notation. Values + * of n are integers starting from 1. For example, a parameter list with two elements looks like this: + *

+ *

+ * &AttributeName.1=first + *

+ *

+ * &AttributeName.2=second + *

+ * + * @param changeMessageVisibilityBatchRequest + * @return Result of the ChangeMessageVisibilityBatch operation returned by the service. + * @throws TooManyEntriesInBatchRequestException + * The batch request contains more entries than permissible. + * @throws EmptyBatchRequestException + * The batch request doesn't contain any entries. + * @throws BatchEntryIdsNotDistinctException + * Two or more batch entries in the request have the same Id. + * @throws InvalidBatchEntryIdException + * The Id of a batch entry in a batch request doesn't abide by the specification. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.ChangeMessageVisibilityBatch + * @see AWS API Documentation + */ + public ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch( + ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AwsServiceException, + SdkClientException { + + List entries = new ArrayList<>(changeMessageVisibilityBatchRequest.entries().size()); + for (ChangeMessageVisibilityBatchRequestEntry entry : changeMessageVisibilityBatchRequest.entries()) { + ChangeMessageVisibilityBatchRequestEntry.Builder entryBuilder = entry.toBuilder(); + if (isS3ReceiptHandle(entry.receiptHandle())) { + entryBuilder.receiptHandle(getOrigReceiptHandle(entry.receiptHandle())); + } + entries.add(entryBuilder.build()); + } + + return amazonSqsToBeExtended.changeMessageVisibilityBatch( + changeMessageVisibilityBatchRequest.toBuilder().entries(entries).build()); + } + + /** + *

+ * Deletes the messages in a queue specified by the QueueURL parameter. + *

+ * + *

+ * When you use the PurgeQueue action, you can't retrieve any messages deleted from a queue. + *

+ *

+ * The message deletion process takes up to 60 seconds. We recommend waiting for 60 seconds regardless of your + * queue's size. + *

+ *
+ *

+ * Messages sent to the queue before you call PurgeQueue might be received but are deleted + * within the next minute. + *

+ *

+ * Messages sent to the queue after you call PurgeQueue might be deleted while the queue is + * being purged. + *

+ * + * @param purgeQueueRequest + * @return Result of the PurgeQueue operation returned by the service. + * @throws QueueDoesNotExistException + * The specified queue doesn't exist. + * @throws PurgeQueueInProgressException + * Indicates that the specified queue previously received a PurgeQueue request within the last + * 60 seconds (the time it can take to delete the messages in the queue). + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.PurgeQueue + * @see AWS API + * Documentation + */ + public PurgeQueueResponse purgeQueue(PurgeQueueRequest purgeQueueRequest) + throws AwsServiceException, SdkClientException { + LOG.warn("Calling purgeQueue deletes SQS messages without deleting their payload from S3."); + + if (purgeQueueRequest == null) { + String errorMessage = "purgeQueueRequest cannot be null."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + + PurgeQueueRequest.Builder purgeQueueRequestBuilder = purgeQueueRequest.toBuilder(); + appendUserAgent(purgeQueueRequestBuilder); + + return super.purgeQueue(purgeQueueRequestBuilder.build()); + } + + private void checkMessageAttributes(Map messageAttributes) { + int msgAttributesSize = getMsgAttributesSize(messageAttributes); + if (msgAttributesSize > clientConfiguration.getPayloadSizeThreshold()) { + String errorMessage = "Total size of Message attributes is " + msgAttributesSize + + " bytes which is larger than the threshold of " + clientConfiguration.getPayloadSizeThreshold() + + " Bytes. Consider including the payload in the message body instead of message attributes."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + + int messageAttributesNum = messageAttributes.size(); + if (messageAttributesNum > SQSExtendedClientConstants.MAX_ALLOWED_ATTRIBUTES) { + String errorMessage = "Number of message attributes [" + messageAttributesNum + + "] exceeds the maximum allowed for large-payload messages [" + + SQSExtendedClientConstants.MAX_ALLOWED_ATTRIBUTES + "]."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + Optional largePayloadAttributeName = getReservedAttributeNameIfPresent(messageAttributes); + + if (largePayloadAttributeName.isPresent()) { + String errorMessage = "Message attribute name " + largePayloadAttributeName.get() + + " is reserved for use by SQS extended client."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + } + + /** + * TODO: Wrap the message pointer as-is to the receiptHandle so that it can be generic + * and does not use any LargeMessageStore implementation specific details. + */ + private String embedS3PointerInReceiptHandle(String receiptHandle, String pointer) { + PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(pointer); + String s3MsgBucketName = s3Pointer.getS3BucketName(); + String s3MsgKey = s3Pointer.getS3Key(); + + String modifiedReceiptHandle = SQSExtendedClientConstants.S3_BUCKET_NAME_MARKER + s3MsgBucketName + + SQSExtendedClientConstants.S3_BUCKET_NAME_MARKER + SQSExtendedClientConstants.S3_KEY_MARKER + + s3MsgKey + SQSExtendedClientConstants.S3_KEY_MARKER + receiptHandle; + return modifiedReceiptHandle; + } + + private String getOrigReceiptHandle(String receiptHandle) { + int secondOccurence = receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER, + receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER) + 1); + itest().given(receiptHandle, "1FOc=-..s3Key..-y*-@T-..s3Key..-").checkEq(secondOccurence, 21); + return receiptHandle.substring(secondOccurence + SQSExtendedClientConstants.S3_KEY_MARKER.length()); + } + + private String getFromReceiptHandleByMarker(String receiptHandle, String marker) { + int firstOccurence = receiptHandle.indexOf(marker); + int secondOccurence = receiptHandle.indexOf(marker, firstOccurence + 1); + return receiptHandle.substring(firstOccurence + marker.length(), secondOccurence); + } + + private boolean isS3ReceiptHandle(String receiptHandle) { + return receiptHandle.contains(SQSExtendedClientConstants.S3_BUCKET_NAME_MARKER) + && receiptHandle.contains(SQSExtendedClientConstants.S3_KEY_MARKER); + } + + private String getMessagePointerFromModifiedReceiptHandle(String receiptHandle) { + String s3MsgBucketName = getFromReceiptHandleByMarker(receiptHandle, SQSExtendedClientConstants.S3_BUCKET_NAME_MARKER); + String s3MsgKey = getFromReceiptHandleByMarker(receiptHandle, SQSExtendedClientConstants.S3_KEY_MARKER); + + PayloadS3Pointer payloadS3Pointer = new PayloadS3Pointer(s3MsgBucketName, s3MsgKey); + return payloadS3Pointer.toJson(); + } + + private boolean isLarge(SendMessageRequest sendMessageRequest) { + int msgAttributesSize = getMsgAttributesSize(sendMessageRequest.messageAttributes()); + long msgBodySize = Util.getStringSizeInBytes(sendMessageRequest.messageBody()); + long totalMsgSize = msgAttributesSize + msgBodySize; + return (totalMsgSize > clientConfiguration.getPayloadSizeThreshold()); + } + + private boolean isLarge(SendMessageBatchRequestEntry batchEntry) { + int msgAttributesSize = getMsgAttributesSize(batchEntry.messageAttributes()); + long msgBodySize = Util.getStringSizeInBytes(batchEntry.messageBody()); + long totalMsgSize = msgAttributesSize + msgBodySize; + return (totalMsgSize > clientConfiguration.getPayloadSizeThreshold()); + } + + private Optional getReservedAttributeNameIfPresent(Map msgAttributes) { + String reservedAttributeName = null; + if (msgAttributes.containsKey(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME)) { + reservedAttributeName = SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME; + } else if (msgAttributes.containsKey(LEGACY_RESERVED_ATTRIBUTE_NAME)) { + reservedAttributeName = LEGACY_RESERVED_ATTRIBUTE_NAME; + } + return Optional.ofNullable(reservedAttributeName); + } + + private int getMsgAttributesSize(Map msgAttributes) { + int totalMsgAttributesSize = 0; + for (Map.Entry entry : msgAttributes.entrySet()) { + totalMsgAttributesSize += Util.getStringSizeInBytes(entry.getKey()); + + MessageAttributeValue entryVal = entry.getValue(); + if (entryVal.dataType() != null) { + totalMsgAttributesSize += Util.getStringSizeInBytes(entryVal.dataType()); + } + + String stringVal = entryVal.stringValue(); + if (stringVal != null) { + totalMsgAttributesSize += Util.getStringSizeInBytes(entryVal.stringValue()); + } + + SdkBytes binaryVal = entryVal.binaryValue(); + if (binaryVal != null) { + totalMsgAttributesSize += binaryVal.asByteArray().length; + } + } + return totalMsgAttributesSize; + } + + private SendMessageBatchRequestEntry storeMessageInS3(SendMessageBatchRequestEntry batchEntry) { + + // Read the content of the message from message body + String messageContentStr = batchEntry.messageBody(); + + Long messageContentSize = Util.getStringSizeInBytes(messageContentStr); + + SendMessageBatchRequestEntry.Builder batchEntryBuilder = batchEntry.toBuilder(); + + batchEntryBuilder.messageAttributes( + updateMessageAttributePayloadSize(batchEntry.messageAttributes(), messageContentSize)); + + // Store the message content in S3. + String largeMessagePointer = payloadStore.storeOriginalPayload(messageContentStr); + batchEntryBuilder.messageBody(largeMessagePointer); + + return batchEntryBuilder.build(); + } + + private SendMessageRequest storeMessageInS3(SendMessageRequest sendMessageRequest) { + + // Read the content of the message from message body + String messageContentStr = sendMessageRequest.messageBody(); + + Long messageContentSize = Util.getStringSizeInBytes(messageContentStr); + + SendMessageRequest.Builder sendMessageRequestBuilder = sendMessageRequest.toBuilder(); + + sendMessageRequestBuilder.messageAttributes( + updateMessageAttributePayloadSize(sendMessageRequest.messageAttributes(), messageContentSize)); + + // Store the message content in S3. + String largeMessagePointer = payloadStore.storeOriginalPayload(messageContentStr); + sendMessageRequestBuilder.messageBody(largeMessagePointer); + + return sendMessageRequestBuilder.build(); + } + + private Map updateMessageAttributePayloadSize( + Map messageAttributes, Long messageContentSize) { + Map updatedMessageAttributes = new HashMap<>(messageAttributes); + + // Add a new message attribute as a flag + MessageAttributeValue.Builder messageAttributeValueBuilder = MessageAttributeValue.builder(); + messageAttributeValueBuilder.dataType("Number"); + messageAttributeValueBuilder.stringValue(messageContentSize.toString()); + MessageAttributeValue messageAttributeValue = messageAttributeValueBuilder.build(); + + if (!clientConfiguration.usesLegacyReservedAttributeName()) { + updatedMessageAttributes.put(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue); + } else { + updatedMessageAttributes.put(LEGACY_RESERVED_ATTRIBUTE_NAME, messageAttributeValue); + } + return updatedMessageAttributes; + } + + @SuppressWarnings("unchecked") + private static T appendUserAgent(final T builder) { + return (T) builder + .overrideConfiguration( + AwsRequestOverrideConfiguration.builder() + .addApiName(ApiName.builder().name(USER_AGENT_NAME) + .version(USER_AGENT_VERSION).build()) + .build()); + } +} diff --git a/demo/amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientBase.java b/demo/amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientBase.java new file mode 100644 index 0000000..73c9567 --- /dev/null +++ b/demo/amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientBase.java @@ -0,0 +1,1124 @@ +/* + * Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.sqs.javamessaging; + +import java.lang.UnsupportedOperationException; + +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.AddPermissionRequest; +import software.amazon.awssdk.services.sqs.model.AddPermissionResponse; +import software.amazon.awssdk.services.sqs.model.BatchEntryIdsNotDistinctException; +import software.amazon.awssdk.services.sqs.model.BatchRequestTooLongException; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse; +import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; +import software.amazon.awssdk.services.sqs.model.CreateQueueResponse; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse; +import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; +import software.amazon.awssdk.services.sqs.model.DeleteQueueResponse; +import software.amazon.awssdk.services.sqs.model.EmptyBatchRequestException; +import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; +import software.amazon.awssdk.services.sqs.model.InvalidAttributeNameException; +import software.amazon.awssdk.services.sqs.model.InvalidBatchEntryIdException; +import software.amazon.awssdk.services.sqs.model.InvalidIdFormatException; +import software.amazon.awssdk.services.sqs.model.InvalidMessageContentsException; +import software.amazon.awssdk.services.sqs.model.ListDeadLetterSourceQueuesRequest; +import software.amazon.awssdk.services.sqs.model.ListDeadLetterSourceQueuesResponse; +import software.amazon.awssdk.services.sqs.model.ListQueueTagsRequest; +import software.amazon.awssdk.services.sqs.model.ListQueueTagsResponse; +import software.amazon.awssdk.services.sqs.model.ListQueuesRequest; +import software.amazon.awssdk.services.sqs.model.ListQueuesResponse; +import software.amazon.awssdk.services.sqs.model.MessageNotInflightException; +import software.amazon.awssdk.services.sqs.model.OverLimitException; +import software.amazon.awssdk.services.sqs.model.PurgeQueueInProgressException; +import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest; +import software.amazon.awssdk.services.sqs.model.PurgeQueueResponse; +import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException; +import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; +import software.amazon.awssdk.services.sqs.model.QueueNameExistsException; +import software.amazon.awssdk.services.sqs.model.ReceiptHandleIsInvalidException; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.RemovePermissionRequest; +import software.amazon.awssdk.services.sqs.model.RemovePermissionResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; +import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest; +import software.amazon.awssdk.services.sqs.model.SetQueueAttributesResponse; +import software.amazon.awssdk.services.sqs.model.SqsException; +import software.amazon.awssdk.services.sqs.model.TagQueueRequest; +import software.amazon.awssdk.services.sqs.model.TagQueueResponse; +import software.amazon.awssdk.services.sqs.model.TooManyEntriesInBatchRequestException; +import software.amazon.awssdk.services.sqs.model.UntagQueueRequest; +import software.amazon.awssdk.services.sqs.model.UntagQueueResponse; + +abstract class AmazonSQSExtendedClientBase implements SqsClient { + SqsClient amazonSqsToBeExtended; + + public AmazonSQSExtendedClientBase(SqsClient sqsClient) { + amazonSqsToBeExtended = sqsClient; + } + + /** + *

+ * Delivers a message to the specified queue. + *

+ * + *

+ * A message can include only XML, JSON, and unformatted text. The following Unicode characters are allowed: + *

+ *

+ * #x9 | #xA | #xD | #x20 to #xD7FF | + * #xE000 to #xFFFD | #x10000 to #x10FFFF + *

+ *

+ * Any characters not included in this list will be rejected. For more information, see the W3C specification for characters. + *

+ *
+ * + * @param sendMessageRequest + * @return Result of the SendMessage operation returned by the service. + * @throws InvalidMessageContentsException + * The message contains characters outside the allowed set. + * @throws UnsupportedOperationException + * Error code 400. Unsupported operation. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.SendMessage + * @see AWS API + * Documentation + */ + public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) throws + InvalidMessageContentsException, UnsupportedOperationException, SdkException, SdkClientException, SqsException { + return amazonSqsToBeExtended.sendMessage(sendMessageRequest); + } + + /** + *

+ * Retrieves one or more messages (up to 10), from the specified queue. Using the WaitTimeSeconds + * parameter enables long-poll support. For more information, see Amazon + * SQS Long Polling in the Amazon Simple Queue Service Developer Guide. + *

+ *

+ * Short poll is the default behavior where a weighted random set of machines is sampled on a + * ReceiveMessage call. Thus, only the messages on the sampled machines are returned. If the number of + * messages in the queue is small (fewer than 1,000), you most likely get fewer messages than you requested per + * ReceiveMessage call. If the number of messages in the queue is extremely small, you might not + * receive any messages in a particular ReceiveMessage response. If this happens, repeat the request. + *

+ *

+ * For each message returned, the response includes the following: + *

+ *
    + *
  • + *

    + * The message body. + *

    + *
  • + *
  • + *

    + * An MD5 digest of the message body. For information about MD5, see RFC1321. + *

    + *
  • + *
  • + *

    + * The MessageId you received when you sent the message to the queue. + *

    + *
  • + *
  • + *

    + * The receipt handle. + *

    + *
  • + *
  • + *

    + * The message attributes. + *

    + *
  • + *
  • + *

    + * An MD5 digest of the message attributes. + *

    + *
  • + *
+ *

+ * The receipt handle is the identifier you must provide when deleting the message. For more information, see Queue and Message Identifiers in the Amazon Simple Queue Service Developer Guide. + *

+ *

+ * You can provide the VisibilityTimeout parameter in your request. The parameter is applied to the + * messages that Amazon SQS returns in the response. If you don't include the parameter, the overall visibility + * timeout for the queue is used for the returned messages. For more information, see Visibility Timeout in the Amazon Simple Queue Service Developer Guide. + *

+ *

+ * A message that isn't deleted or a message whose visibility isn't extended before the visibility timeout expires + * counts as a failed receive. Depending on the configuration of the queue, the message might be sent to the + * dead-letter queue. + *

+ * + *

+ * In the future, new attributes might be added. If you write code that calls this action, we recommend that you + * structure your code so that it can handle new attributes gracefully. + *

+ *
+ * + * @param receiveMessageRequest + * @return Result of the ReceiveMessage operation returned by the service. + * @throws OverLimitException + * The specified action violates a limit. For example, ReceiveMessage returns this error if the + * maximum number of inflight messages is reached and AddPermission returns this error if the + * maximum number of permissions for the queue is reached. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.ReceiveMessage + * @see AWS API + * Documentation + */ + public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessageRequest) { + return amazonSqsToBeExtended.receiveMessage(receiveMessageRequest); + } + + /** + *

+ * Deletes the specified message from the specified queue. To select the message to delete, use the + * ReceiptHandle of the message (not the MessageId which you receive when you send + * the message). Amazon SQS can delete a message from a queue even if a visibility timeout setting causes the + * message to be locked by another consumer. Amazon SQS automatically deletes messages left in a queue longer than + * the retention period configured for the queue. + *

+ * + *

+ * The ReceiptHandle is associated with a specific instance of receiving a message. If you + * receive a message more than once, the ReceiptHandle is different each time you receive a message. + * When you use the DeleteMessage action, you must provide the most recently received + * ReceiptHandle for the message (otherwise, the request succeeds, but the message might not be + * deleted). + *

+ *

+ * For standard queues, it is possible to receive a message even after you delete it. This might happen on rare + * occasions if one of the servers which stores a copy of the message is unavailable when you send the request to + * delete the message. The copy remains on the server and might be returned to you during a subsequent receive + * request. You should ensure that your application is idempotent, so that receiving a message more than once does + * not cause issues. + *

+ *
+ * + * @param deleteMessageRequest + * @return Result of the DeleteMessage operation returned by the service. + * @throws InvalidIdFormatException + * The specified receipt handle isn't valid for the current version. + * @throws ReceiptHandleIsInvalidException + * The specified receipt handle isn't valid. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.DeleteMessage + * @see AWS API + * Documentation + */ + public DeleteMessageResponse deleteMessage(DeleteMessageRequest deleteMessageRequest) throws + InvalidIdFormatException, ReceiptHandleIsInvalidException, SdkException, SdkClientException, SqsException { + return amazonSqsToBeExtended.deleteMessage(deleteMessageRequest); + } + + /** + *

+ * Sets the value of one or more queue attributes. When you change a queue's attributes, the change can take up to + * 60 seconds for most of the attributes to propagate throughout the Amazon SQS system. Changes made to the + * MessageRetentionPeriod attribute can take up to 15 minutes. + *

+ * + *
    + *
  • + *

    + * In the future, new attributes might be added. If you write code that calls this action, we recommend that you + * structure your code so that it can handle new attributes gracefully. + *

    + *
  • + *
  • + *

    + * Cross-account permissions don't apply to this action. For more information, see Grant Cross-Account Permissions to a Role and a User Name in the Amazon Simple Queue Service Developer + * Guide. + *

    + *
  • + *
  • + *

    + * To remove the ability to change queue permissions, you must deny permission to the AddPermission, + * RemovePermission, and SetQueueAttributes actions in your IAM policy. + *

    + *
  • + *
+ *
+ * + * @param setQueueAttributesRequest + * @return Result of the SetQueueAttributes operation returned by the service. + * @throws InvalidAttributeNameException + * The specified attribute doesn't exist. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.SetQueueAttributes + * @see AWS API + * Documentation + */ + public SetQueueAttributesResponse setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) + throws InvalidAttributeNameException, AwsServiceException, SdkClientException, SqsException { + + return amazonSqsToBeExtended.setQueueAttributes(setQueueAttributesRequest); + } + + /** + *

+ * Changes the visibility timeout of multiple messages. This is a batch version of + * ChangeMessageVisibility. The result of the action on each message is reported individually + * in the response. You can send up to 10 ChangeMessageVisibility requests with each + * ChangeMessageVisibilityBatch action. + *

+ * + *

+ * Because the batch request can result in a combination of successful and unsuccessful actions, you should check + * for batch errors even when the call returns an HTTP status code of 200. + *

+ *
+ *

+ * Some actions take lists of parameters. These lists are specified using the param.n notation. Values + * of n are integers starting from 1. For example, a parameter list with two elements looks like this: + *

+ *

+ * &AttributeName.1=first + *

+ *

+ * &AttributeName.2=second + *

+ * + * @param changeMessageVisibilityBatchRequest + * @return Result of the ChangeMessageVisibilityBatch operation returned by the service. + * @throws TooManyEntriesInBatchRequestException + * The batch request contains more entries than permissible. + * @throws EmptyBatchRequestException + * The batch request doesn't contain any entries. + * @throws BatchEntryIdsNotDistinctException + * Two or more batch entries in the request have the same Id. + * @throws InvalidBatchEntryIdException + * The Id of a batch entry in a batch request doesn't abide by the specification. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.ChangeMessageVisibilityBatch + * @see AWS API Documentation + */ + public ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch( + ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AwsServiceException, + SdkClientException { + + return amazonSqsToBeExtended.changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest); + } + + /** + *

+ * Changes the visibility timeout of a specified message in a queue to a new value. The default visibility timeout + * for a message is 30 seconds. The minimum is 0 seconds. The maximum is 12 hours. For more information, see + * Visibility Timeout in the Amazon Simple Queue Service Developer Guide. + *

+ *

+ * For example, you have a message with a visibility timeout of 5 minutes. After 3 minutes, you call + * ChangeMessageVisibility with a timeout of 10 minutes. You can continue to call + * ChangeMessageVisibility to extend the visibility timeout to the maximum allowed time. If you try to + * extend the visibility timeout beyond the maximum, your request is rejected. + *

+ *

+ * An Amazon SQS message has three basic states: + *

+ *
    + *
  1. + *

    + * Sent to a queue by a producer. + *

    + *
  2. + *
  3. + *

    + * Received from the queue by a consumer. + *

    + *
  4. + *
  5. + *

    + * Deleted from the queue. + *

    + *
  6. + *
+ *

+ * A message is considered to be stored after it is sent to a queue by a producer, but not yet received from + * the queue by a consumer (that is, between states 1 and 2). There is no limit to the number of stored messages. A + * message is considered to be in flight after it is received from a queue by a consumer, but not yet deleted + * from the queue (that is, between states 2 and 3). There is a limit to the number of inflight messages. + *

+ *

+ * Limits that apply to inflight messages are unrelated to the unlimited number of stored messages. + *

+ *

+ * For most standard queues (depending on queue traffic and message backlog), there can be a maximum of + * approximately 120,000 inflight messages (received from a queue by a consumer, but not yet deleted from the + * queue). If you reach this limit, Amazon SQS returns the OverLimit error message. To avoid reaching + * the limit, you should delete messages from the queue after they're processed. You can also increase the number of + * queues you use to process your messages. To request a limit increase, file a support request. + *

+ *

+ * For FIFO queues, there can be a maximum of 20,000 inflight messages (received from a queue by a consumer, but not + * yet deleted from the queue). If you reach this limit, Amazon SQS returns no error messages. + *

+ * + *

+ * If you attempt to set the VisibilityTimeout to a value greater than the maximum time left, Amazon + * SQS returns an error. Amazon SQS doesn't automatically recalculate and increase the timeout to the maximum + * remaining time. + *

+ *

+ * Unlike with a queue, when you change the visibility timeout for a specific message the timeout value is applied + * immediately but isn't saved in memory for that message. If you don't delete a message after it is received, the + * visibility timeout for the message reverts to the original timeout value (not to the value you set using the + * ChangeMessageVisibility action) the next time the message is received. + *

+ *
+ * + * @param changeMessageVisibilityRequest + * @return Result of the ChangeMessageVisibility operation returned by the service. + * @throws MessageNotInflightException + * The specified message isn't in flight. + * @throws ReceiptHandleIsInvalidException + * The specified receipt handle isn't valid. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.ChangeMessageVisibility + * @see AWS + * API Documentation + */ + public ChangeMessageVisibilityResponse changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) + throws AwsServiceException, SdkClientException { + + return amazonSqsToBeExtended.changeMessageVisibility(changeMessageVisibilityRequest); + } + + /** + *

+ * Returns the URL of an existing Amazon SQS queue. + *

+ *

+ * To access a queue that belongs to another AWS account, use the QueueOwnerAWSAccountId parameter to + * specify the account ID of the queue's owner. The queue's owner must grant you permission to access the queue. For + * more information about shared queue access, see AddPermission or see Allow Developers to Write Messages to a Shared Queue in the Amazon Simple Queue Service Developer + * Guide. + *

+ * + * @param getQueueUrlRequest + * @return Result of the GetQueueUrl operation returned by the service. + * @throws QueueDoesNotExistException + * The specified queue doesn't exist. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.GetQueueUrl + * @see AWS API + * Documentation + */ + public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws AwsServiceException, + SdkClientException { + + return amazonSqsToBeExtended.getQueueUrl(getQueueUrlRequest); + } + + /** + *

+ * Revokes any permissions in the queue policy that matches the specified Label parameter. + *

+ * + *
    + *
  • + *

    + * Only the owner of a queue can remove permissions from it. + *

    + *
  • + *
  • + *

    + * Cross-account permissions don't apply to this action. For more information, see Grant Cross-Account Permissions to a Role and a User Name in the Amazon Simple Queue Service Developer + * Guide. + *

    + *
  • + *
  • + *

    + * To remove the ability to change queue permissions, you must deny permission to the AddPermission, + * RemovePermission, and SetQueueAttributes actions in your IAM policy. + *

    + *
  • + *
+ *
+ * + * @param removePermissionRequest + * @return Result of the RemovePermission operation returned by the service. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.RemovePermission + * @see AWS API + * Documentation + */ + public RemovePermissionResponse removePermission(RemovePermissionRequest removePermissionRequest) + throws AwsServiceException, SdkClientException { + + return amazonSqsToBeExtended.removePermission(removePermissionRequest); + } + + /** + *

+ * Gets attributes for the specified queue. + *

+ * + *

+ * To determine whether a queue is FIFO, you + * can check whether QueueName ends with the .fifo suffix. + *

+ *
+ * + * @param getQueueAttributesRequest + * @return Result of the GetQueueAttributes operation returned by the service. + * @throws InvalidAttributeNameException + * The specified attribute doesn't exist. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.GetQueueAttributes + * @see AWS API + * Documentation + */ + public GetQueueAttributesResponse getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) + throws AwsServiceException, SdkClientException { + + return amazonSqsToBeExtended.getQueueAttributes(getQueueAttributesRequest); + } + + /** + *

+ * Delivers up to ten messages to the specified queue. This is a batch version of SendMessage. + * For a FIFO queue, multiple messages within a single batch are enqueued in the order they are sent. + *

+ *

+ * The result of sending each message is reported individually in the response. Because the batch request can result + * in a combination of successful and unsuccessful actions, you should check for batch errors even when the call + * returns an HTTP status code of 200. + *

+ *

+ * The maximum allowed individual message size and the maximum total payload size (the sum of the individual lengths + * of all of the batched messages) are both 256 KB (262,144 bytes). + *

+ * + *

+ * A message can include only XML, JSON, and unformatted text. The following Unicode characters are allowed: + *

+ *

+ * #x9 | #xA | #xD | #x20 to #xD7FF | + * #xE000 to #xFFFD | #x10000 to #x10FFFF + *

+ *

+ * Any characters not included in this list will be rejected. For more information, see the W3C specification for characters. + *

+ *
+ *

+ * If you don't specify the DelaySeconds parameter for an entry, Amazon SQS uses the default value for + * the queue. + *

+ *

+ * Some actions take lists of parameters. These lists are specified using the param.n notation. Values + * of n are integers starting from 1. For example, a parameter list with two elements looks like this: + *

+ *

+ * &AttributeName.1=first + *

+ *

+ * &AttributeName.2=second + *

+ * + * @param sendMessageBatchRequest + * @return Result of the SendMessageBatch operation returned by the service. + * @throws TooManyEntriesInBatchRequestException + * The batch request contains more entries than permissible. + * @throws EmptyBatchRequestException + * The batch request doesn't contain any entries. + * @throws BatchEntryIdsNotDistinctException + * Two or more batch entries in the request have the same Id. + * @throws BatchRequestTooLongException + * The length of all the messages put together is more than the limit. + * @throws InvalidBatchEntryIdException + * The Id of a batch entry in a batch request doesn't abide by the specification. + * @throws UnsupportedOperationException + * Error code 400. Unsupported operation. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.SendMessageBatch + * @see AWS API + * Documentation + */ + public SendMessageBatchResponse sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) + throws AwsServiceException, SdkClientException { + + return amazonSqsToBeExtended.sendMessageBatch(sendMessageBatchRequest); + } + + /** + *

+ * Deletes the messages in a queue specified by the QueueURL parameter. + *

+ * + *

+ * When you use the PurgeQueue action, you can't retrieve any messages deleted from a queue. + *

+ *

+ * The message deletion process takes up to 60 seconds. We recommend waiting for 60 seconds regardless of your + * queue's size. + *

+ *
+ *

+ * Messages sent to the queue before you call PurgeQueue might be received but are deleted + * within the next minute. + *

+ *

+ * Messages sent to the queue after you call PurgeQueue might be deleted while the queue is + * being purged. + *

+ * + * @param purgeQueueRequest + * @return Result of the PurgeQueue operation returned by the service. + * @throws QueueDoesNotExistException + * The specified queue doesn't exist. + * @throws PurgeQueueInProgressException + * Indicates that the specified queue previously received a PurgeQueue request within the last + * 60 seconds (the time it can take to delete the messages in the queue). + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.PurgeQueue + * @see AWS API + * Documentation + */ + public PurgeQueueResponse purgeQueue(PurgeQueueRequest purgeQueueRequest) + throws AwsServiceException, SdkClientException { + + return amazonSqsToBeExtended.purgeQueue(purgeQueueRequest); + + } + + /** + *

+ * Returns a list of your queues that have the RedrivePolicy queue attribute configured with a + * dead-letter queue. + *

+ *

+ * The ListDeadLetterSourceQueues methods supports pagination. Set parameter MaxResults in + * the request to specify the maximum number of results to be returned in the response. If you do not set + * MaxResults, the response includes a maximum of 1,000 results. If you set MaxResults and + * there are additional results to display, the response includes a value for NextToken. Use + * NextToken as a parameter in your next request to ListDeadLetterSourceQueues to receive + * the next page of results. + *

+ *

+ * For more information about using dead-letter queues, see Using Amazon SQS Dead-Letter Queues in the Amazon Simple Queue Service Developer Guide. + *

+ * + * @param listDeadLetterSourceQueuesRequest + * @return Result of the ListDeadLetterSourceQueues operation returned by the service. + * @throws QueueDoesNotExistException + * The specified queue doesn't exist. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.ListDeadLetterSourceQueues + * @see AWS + * API Documentation + */ + public ListDeadLetterSourceQueuesResponse listDeadLetterSourceQueues( + ListDeadLetterSourceQueuesRequest listDeadLetterSourceQueuesRequest) throws AwsServiceException, + SdkClientException { + + return amazonSqsToBeExtended.listDeadLetterSourceQueues(listDeadLetterSourceQueuesRequest); + } + + /** + *

+ * Deletes the queue specified by the QueueUrl, regardless of the queue's contents. + *

+ * + *

+ * Be careful with the DeleteQueue action: When you delete a queue, any messages in the queue are no + * longer available. + *

+ *
+ *

+ * When you delete a queue, the deletion process takes up to 60 seconds. Requests you send involving that queue + * during the 60 seconds might succeed. For example, a SendMessage request might succeed, but + * after 60 seconds the queue and the message you sent no longer exist. + *

+ *

+ * When you delete a queue, you must wait at least 60 seconds before creating a queue with the same name. + *

+ * + *

+ * Cross-account permissions don't apply to this action. For more information, see Grant Cross-Account Permissions to a Role and a User Name in the Amazon Simple Queue Service Developer + * Guide. + *

+ *
+ * + * @param deleteQueueRequest + * @return Result of the DeleteQueue operation returned by the service. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.DeleteQueue + * @see AWS API + * Documentation + */ + public DeleteQueueResponse deleteQueue(DeleteQueueRequest deleteQueueRequest) + throws AwsServiceException, SdkClientException { + + return amazonSqsToBeExtended.deleteQueue(deleteQueueRequest); + } + + /** + *

+ * Returns a list of your queues in the current region. The response includes a maximum of 1,000 results. If you + * specify a value for the optional QueueNamePrefix parameter, only queues with a name that begins with + * the specified value are returned. + *

+ *

+ * The listQueues methods supports pagination. Set parameter MaxResults in the request to + * specify the maximum number of results to be returned in the response. If you do not set MaxResults, + * the response includes a maximum of 1,000 results. If you set MaxResults and there are additional + * results to display, the response includes a value for NextToken. Use NextToken as a + * parameter in your next request to listQueues to receive the next page of results. + *

+ * + *

+ * Cross-account permissions don't apply to this action. For more information, see Grant Cross-Account Permissions to a Role and a User Name in the Amazon Simple Queue Service Developer + * Guide. + *

+ *
+ * + * @param listQueuesRequest + * @return Result of the ListQueues operation returned by the service. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.ListQueues + * @see AWS API + * Documentation + */ + public ListQueuesResponse listQueues(ListQueuesRequest listQueuesRequest) throws AwsServiceException, + SdkClientException { + + return amazonSqsToBeExtended.listQueues(listQueuesRequest); + } + + /** + *

+ * Deletes up to ten messages from the specified queue. This is a batch version of + * DeleteMessage. The result of the action on each message is reported individually in the + * response. + *

+ * + *

+ * Because the batch request can result in a combination of successful and unsuccessful actions, you should check + * for batch errors even when the call returns an HTTP status code of 200. + *

+ *
+ *

+ * Some actions take lists of parameters. These lists are specified using the param.n notation. Values + * of n are integers starting from 1. For example, a parameter list with two elements looks like this: + *

+ *

+ * &AttributeName.1=first + *

+ *

+ * &AttributeName.2=second + *

+ * + * @param deleteMessageBatchRequest + * @return Result of the DeleteMessageBatch operation returned by the service. + * @throws TooManyEntriesInBatchRequestException + * The batch request contains more entries than permissible. + * @throws EmptyBatchRequestException + * The batch request doesn't contain any entries. + * @throws BatchEntryIdsNotDistinctException + * Two or more batch entries in the request have the same Id. + * @throws InvalidBatchEntryIdException + * The Id of a batch entry in a batch request doesn't abide by the specification. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.DeleteMessageBatch + * @see AWS API + * Documentation + */ + public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) + throws AwsServiceException, SdkClientException { + + return amazonSqsToBeExtended.deleteMessageBatch(deleteMessageBatchRequest); + } + + /** + *

+ * Creates a new standard or FIFO queue. You can pass one or more attributes in the request. Keep the following in + * mind: + *

+ *
    + *
  • + *

    + * If you don't specify the FifoQueue attribute, Amazon SQS creates a standard queue. + *

    + * + *

    + * You can't change the queue type after you create it and you can't convert an existing standard queue into a FIFO + * queue. You must either create a new FIFO queue for your application or delete your existing standard queue and + * recreate it as a FIFO queue. For more information, see Moving From a Standard Queue to a FIFO Queue in the Amazon Simple Queue Service Developer Guide. + *

    + *
  • + *
  • + *

    + * If you don't provide a value for an attribute, the queue is created with the default value for the attribute. + *

    + *
  • + *
  • + *

    + * If you delete a queue, you must wait at least 60 seconds before creating a queue with the same name. + *

    + *
  • + *
+ *

+ * To successfully create a new queue, you must provide a queue name that adheres to the limits + * related to queues and is unique within the scope of your queues. + *

+ * + *

+ * After you create a queue, you must wait at least one second after the queue is created to be able to use the + * queue. + *

+ *
+ *

+ * To get the queue URL, use the GetQueueUrl action. GetQueueUrl + * requires only the QueueName parameter. be aware of existing queue names: + *

+ *
    + *
  • + *

    + * If you provide the name of an existing queue along with the exact names and values of all the queue's attributes, + * CreateQueue returns the queue URL for the existing queue. + *

    + *
  • + *
  • + *

    + * If the queue name, attribute names, or attribute values don't match an existing queue, CreateQueue + * returns an error. + *

    + *
  • + *
+ *

+ * Some actions take lists of parameters. These lists are specified using the param.n notation. Values + * of n are integers starting from 1. For example, a parameter list with two elements looks like this: + *

+ *

+ * &AttributeName.1=first + *

+ *

+ * &AttributeName.2=second + *

+ * + *

+ * Cross-account permissions don't apply to this action. For more information, see Grant Cross-Account Permissions to a Role and a User Name in the Amazon Simple Queue Service Developer + * Guide. + *

+ *
+ * + * @param createQueueRequest + * @return Result of the CreateQueue operation returned by the service. + * @throws QueueDeletedRecentlyException + * You must wait 60 seconds after deleting a queue before you can create another queue with the same name. + * @throws QueueNameExistsException + * A queue with this name already exists. Amazon SQS returns this error only if the request includes + * attributes whose values differ from those of the existing queue. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.CreateQueue + * @see AWS API + * Documentation + */ + public CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) + throws AwsServiceException, SdkClientException { + + return amazonSqsToBeExtended.createQueue(createQueueRequest); + } + + /** + *

+ * Adds a permission to a queue for a specific principal. This allows sharing access + * to the queue. + *

+ *

+ * When you create a queue, you have full control access rights for the queue. Only you, the owner of the queue, can + * grant or deny permissions to the queue. For more information about these permissions, see Allow Developers to Write Messages to a Shared Queue in the Amazon Simple Queue Service Developer + * Guide. + *

+ * + *
    + *
  • + *

    + * AddPermission generates a policy for you. You can use SetQueueAttributes to + * upload your policy. For more information, see Using Custom Policies with the Amazon SQS Access Policy Language in the Amazon Simple Queue Service + * Developer Guide. + *

    + *
  • + *
  • + *

    + * An Amazon SQS policy can have a maximum of 7 actions. + *

    + *
  • + *
  • + *

    + * To remove the ability to change queue permissions, you must deny permission to the AddPermission, + * RemovePermission, and SetQueueAttributes actions in your IAM policy. + *

    + *
  • + *
+ *
+ *

+ * Some actions take lists of parameters. These lists are specified using the param.n notation. Values + * of n are integers starting from 1. For example, a parameter list with two elements looks like this: + *

+ *

+ * &AttributeName.1=first + *

+ *

+ * &AttributeName.2=second + *

+ * + *

+ * Cross-account permissions don't apply to this action. For more information, see Grant Cross-Account Permissions to a Role and a User Name in the Amazon Simple Queue Service Developer + * Guide. + *

+ *
+ * + * @param addPermissionRequest + * @return Result of the AddPermission operation returned by the service. + * @throws OverLimitException + * The specified action violates a limit. For example, ReceiveMessage returns this error if the + * maximum number of inflight messages is reached and AddPermission returns this error if the + * maximum number of permissions for the queue is reached. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.AddPermission + * @see AWS API + * Documentation + */ + public AddPermissionResponse addPermission(AddPermissionRequest addPermissionRequest) + throws AwsServiceException, SdkClientException, OverLimitException { + + return amazonSqsToBeExtended.addPermission(addPermissionRequest); + } + + /** + *

+ * Returns a list of your queues in the current region. The response includes a maximum of 1,000 results. If you + * specify a value for the optional QueueNamePrefix parameter, only queues with a name that begins with + * the specified value are returned. + *

+ *

+ * The listQueues methods supports pagination. Set parameter MaxResults in the request to + * specify the maximum number of results to be returned in the response. If you do not set MaxResults, + * the response includes a maximum of 1,000 results. If you set MaxResults and there are additional + * results to display, the response includes a value for NextToken. Use NextToken as a + * parameter in your next request to listQueues to receive the next page of results. + *

+ * + *

+ * Cross-account permissions don't apply to this action. For more information, see Grant Cross-Account Permissions to a Role and a User Name in the Amazon Simple Queue Service Developer + * Guide. + *

+ *
+ * + * @return Result of the ListQueues operation returned by the service. + * @throws SdkException + * Base class for all exceptions that can be thrown by the SDK (both service and client). Can be used for + * catch all scenarios. + * @throws SdkClientException + * If any client side error occurs such as an IO related failure, failure to get credentials, etc. + * @throws SqsException + * Base class for all service exceptions. Unknown exceptions will be thrown as an instance of this type. + * @sample SqsClient.ListQueues + * @see #listQueues(ListQueuesRequest) + * @see AWS API + * Documentation + */ + public ListQueuesResponse listQueues() throws AwsServiceException, SdkClientException { + + return amazonSqsToBeExtended.listQueues(); + } + + /** {@inheritDoc} */ + @Override public ListQueueTagsResponse listQueueTags(final ListQueueTagsRequest listQueueTagsRequest) { + return amazonSqsToBeExtended.listQueueTags(listQueueTagsRequest); + } + + /** {@inheritDoc} */ + @Override public TagQueueResponse tagQueue(final TagQueueRequest tagQueueRequest) { + return amazonSqsToBeExtended.tagQueue(tagQueueRequest); + } + + /** {@inheritDoc} */ + @Override public UntagQueueResponse untagQueue(final UntagQueueRequest untagQueueRequest) { + return amazonSqsToBeExtended.untagQueue(untagQueueRequest); + } + + @Override + public String serviceName() { + return amazonSqsToBeExtended.serviceName(); + } + + @Override + public void close() { + amazonSqsToBeExtended.close(); + } +} diff --git a/demo/amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java b/demo/amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java new file mode 100644 index 0000000..039ca06 --- /dev/null +++ b/demo/amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java @@ -0,0 +1,315 @@ +/* + * Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.sqs.javamessaging; + +import software.amazon.awssdk.annotations.NotThreadSafe; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; +import software.amazon.payloadoffloading.PayloadStorageConfiguration; +import software.amazon.payloadoffloading.ServerSideEncryptionStrategy; + + +/** + * Amazon SQS extended client configuration options such as Amazon S3 client, + * bucket name, and message size threshold for large-payload messages. + */ +@NotThreadSafe +public class ExtendedClientConfiguration extends PayloadStorageConfiguration { + + private boolean cleanupS3Payload = true; + private boolean useLegacyReservedAttributeName = true; + private boolean ignorePayloadNotFound = false; + + public ExtendedClientConfiguration() { + super(); + this.setPayloadSizeThreshold(SQSExtendedClientConstants.DEFAULT_MESSAGE_SIZE_THRESHOLD); + } + + public ExtendedClientConfiguration(ExtendedClientConfiguration other) { + super(other); + this.cleanupS3Payload = other.doesCleanupS3Payload(); + this.useLegacyReservedAttributeName = other.usesLegacyReservedAttributeName(); + this.ignorePayloadNotFound = other.ignoresPayloadNotFound(); + } + + /** + * Enables support for payload messages. + * @param s3 + * Amazon S3 client which is going to be used for storing + * payload messages. + * @param s3BucketName + * Name of the bucket which is going to be used for storing + * payload messages. The bucket must be already created and + * configured in s3. + * @param cleanupS3Payload + * If set to true, would handle deleting the S3 object as part + * of deleting the message from SQS queue. Otherwise, would not + * attempt to delete the object from S3. If opted to not delete S3 + * objects its the responsibility to the message producer to handle + * the clean up appropriately. + */ + public void setPayloadSupportEnabled(S3Client s3, String s3BucketName, boolean cleanupS3Payload) { + setPayloadSupportEnabled(s3, s3BucketName); + this.cleanupS3Payload = cleanupS3Payload; + } + + /** + * Enables support for payload messages. + * @param s3 + * Amazon S3 client which is going to be used for storing + * payload messages. + * @param s3BucketName + * Name of the bucket which is going to be used for storing + * payload messages. The bucket must be already created and + * configured in s3. + * @param cleanupS3Payload + * If set to true, would handle deleting the S3 object as part + * of deleting the message from SQS queue. Otherwise, would not + * attempt to delete the object from S3. If opted to not delete S3 + * objects its the responsibility to the message producer to handle + * the clean up appropriately. + */ + public ExtendedClientConfiguration withPayloadSupportEnabled(S3Client s3, String s3BucketName, boolean cleanupS3Payload) { + setPayloadSupportEnabled(s3, s3BucketName, cleanupS3Payload); + return this; + } + + /** + * Disables the utilization legacy payload attribute name when sending messages. + */ + public void setLegacyReservedAttributeNameDisabled() { + this.useLegacyReservedAttributeName = false; + } + + /** + * Disables the utilization legacy payload attribute name when sending messages. + */ + public ExtendedClientConfiguration withLegacyReservedAttributeNameDisabled() { + setLegacyReservedAttributeNameDisabled(); + return this; + } + + /** + * Sets whether or not messages should be removed from Amazon SQS + * when payloads are not found in Amazon S3. + * + * @param ignorePayloadNotFound + * Whether or not messages should be removed from Amazon SQS + * when payloads are not found in Amazon S3. Default: false + */ + public void setIgnorePayloadNotFound(boolean ignorePayloadNotFound) { + this.ignorePayloadNotFound = ignorePayloadNotFound; + } + + /** + * Sets whether or not messages should be removed from Amazon SQS + * when payloads are not found in Amazon S3. + * + * @param ignorePayloadNotFound + * Whether or not messages should be removed from Amazon SQS + * when payloads are not found in Amazon S3. Default: false + * @return the updated ExtendedClientConfiguration object. + */ + public ExtendedClientConfiguration withIgnorePayloadNotFound(boolean ignorePayloadNotFound) { + setIgnorePayloadNotFound(ignorePayloadNotFound); + return this; + } + + /** + * Checks whether or not clean up large objects in S3 is enabled. + * + * @return True if clean up is enabled when deleting the concerning SQS message. + * Default: true + */ + public boolean doesCleanupS3Payload() { + return cleanupS3Payload; + } + + /** + * Checks whether or not the configuration uses the legacy reserved attribute name. + * + * @return True if legacy reserved attribute name is used. + * Default: true + */ + + public boolean usesLegacyReservedAttributeName() { + return useLegacyReservedAttributeName; + } + + /** + * Checks whether or not messages should be removed from Amazon SQS + * when payloads are not found in Amazon S3. + * + * @return True if messages should be removed from Amazon SQS + * when payloads are not found in Amazon S3. Default: false + */ + public boolean ignoresPayloadNotFound() { + return ignorePayloadNotFound; + } + + @Override + public ExtendedClientConfiguration withAlwaysThroughS3(boolean alwaysThroughS3) { + setAlwaysThroughS3(alwaysThroughS3); + return this; + } + + @Override + public ExtendedClientConfiguration withPayloadSupportEnabled(S3Client s3, String s3BucketName) { + this.setPayloadSupportEnabled(s3, s3BucketName); + return this; + } + + @Override + public ExtendedClientConfiguration withObjectCannedACL(ObjectCannedACL objectCannedACL) { + this.setObjectCannedACL(objectCannedACL); + return this; + } + + @Override + public ExtendedClientConfiguration withPayloadSizeThreshold(int payloadSizeThreshold) { + this.setPayloadSizeThreshold(payloadSizeThreshold); + return this; + } + + @Override + public ExtendedClientConfiguration withPayloadSupportDisabled() { + this.setPayloadSupportDisabled(); + return this; + } + + @Override + public ExtendedClientConfiguration withServerSideEncryption(ServerSideEncryptionStrategy serverSideEncryption) { + this.setServerSideEncryptionStrategy(serverSideEncryption); + return this; + } + + /** + * Enables support for large-payload messages. + * + * @param s3 + * Amazon S3 client which is going to be used for storing + * large-payload messages. + * @param s3BucketName + * Name of the bucket which is going to be used for storing + * large-payload messages. The bucket must be already created and + * configured in s3. + * + * @deprecated Instead use {@link #setPayloadSupportEnabled(S3Client, String, boolean)} + */ + @Deprecated + public void setLargePayloadSupportEnabled(S3Client s3, String s3BucketName) { + this.setPayloadSupportEnabled(s3, s3BucketName); + } + + /** + * Enables support for large-payload messages. + * + * @param s3 + * Amazon S3 client which is going to be used for storing + * large-payload messages. + * @param s3BucketName + * Name of the bucket which is going to be used for storing + * large-payload messages. The bucket must be already created and + * configured in s3. + * @return the updated ExtendedClientConfiguration object. + * + * @deprecated Instead use {@link #withPayloadSupportEnabled(S3Client, String)} + */ + @Deprecated + public ExtendedClientConfiguration withLargePayloadSupportEnabled(S3Client s3, String s3BucketName) { + setLargePayloadSupportEnabled(s3, s3BucketName); + return this; + } + + /** + * Disables support for large-payload messages. + * + * @deprecated Instead use {@link #setPayloadSupportDisabled()} + */ + @Deprecated + public void setLargePayloadSupportDisabled() { + this.setPayloadSupportDisabled(); + } + + /** + * Disables support for large-payload messages. + * @return the updated ExtendedClientConfiguration object. + * + * @deprecated Instead use {@link #withPayloadSupportDisabled()} + */ + @Deprecated + public ExtendedClientConfiguration withLargePayloadSupportDisabled() { + setLargePayloadSupportDisabled(); + return this; + } + + /** + * Check if the support for large-payload message if enabled. + * @return true if support for large-payload messages is enabled. + * + * @deprecated Instead use {@link #isPayloadSupportEnabled()} + */ + @Deprecated + public boolean isLargePayloadSupportEnabled() { + return isPayloadSupportEnabled(); + } + + /** + * Sets the message size threshold for storing message payloads in Amazon + * S3. + * + * @param messageSizeThreshold + * Message size threshold to be used for storing in Amazon S3. + * Default: 256KB. + * + * @deprecated Instead use {@link #setPayloadSizeThreshold(int)} + */ + @Deprecated + public void setMessageSizeThreshold(int messageSizeThreshold) { + this.setPayloadSizeThreshold(messageSizeThreshold); + } + + /** + * Sets the message size threshold for storing message payloads in Amazon + * S3. + * + * @param messageSizeThreshold + * Message size threshold to be used for storing in Amazon S3. + * Default: 256KB. + * @return the updated ExtendedClientConfiguration object. + * + * @deprecated Instead use {@link #withPayloadSizeThreshold(int)} + */ + @Deprecated + public ExtendedClientConfiguration withMessageSizeThreshold(int messageSizeThreshold) { + setMessageSizeThreshold(messageSizeThreshold); + return this; + } + + /** + * Gets the message size threshold for storing message payloads in Amazon + * S3. + * + * @return Message size threshold which is being used for storing in Amazon + * S3. Default: 256KB. + * + * @deprecated Instead use {@link #getPayloadSizeThreshold()} + */ + @Deprecated + public int getMessageSizeThreshold() { + return getPayloadSizeThreshold(); + } +} \ No newline at end of file diff --git a/demo/amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/SQSExtendedClientConstants.java b/demo/amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/SQSExtendedClientConstants.java new file mode 100644 index 0000000..97f1f22 --- /dev/null +++ b/demo/amazon-sqs-java-extended-client-lib/src/main/java/com/amazon/sqs/javamessaging/SQSExtendedClientConstants.java @@ -0,0 +1,34 @@ +/* + * Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.sqs.javamessaging; + + +public class SQSExtendedClientConstants { + // This constant is shared with SNSExtendedClient + // SNS team should be notified of any changes made to this + public static final String RESERVED_ATTRIBUTE_NAME = "ExtendedPayloadSize"; + + // This constant is shared with SNSExtendedClient + // SNS team should be notified of any changes made to this + public static final int MAX_ALLOWED_ATTRIBUTES = 10 - 1; // 10 for SQS, 1 for the reserved attribute + + // This constant is shared with SNSExtendedClient + // SNS team should be notified of any changes made to this + public static final int DEFAULT_MESSAGE_SIZE_THRESHOLD = 262144; + + public static final String S3_BUCKET_NAME_MARKER = "-..s3BucketName..-"; + public static final String S3_KEY_MARKER = "-..s3Key..-"; +} diff --git a/demo/amazon-sqs-java-extended-client-lib/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java b/demo/amazon-sqs-java-extended-client-lib/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java new file mode 100644 index 0000000..d2e7798 --- /dev/null +++ b/demo/amazon-sqs-java-extended-client-lib/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java @@ -0,0 +1,671 @@ +/* + * Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.sqs.javamessaging; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; +import software.amazon.awssdk.core.ApiName; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.utils.ImmutableMap; +import software.amazon.awssdk.utils.StringInputStream; +import software.amazon.payloadoffloading.PayloadS3Pointer; +import software.amazon.payloadoffloading.ServerSideEncryptionFactory; +import software.amazon.payloadoffloading.ServerSideEncryptionStrategy; + +import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClient.USER_AGENT_NAME; +import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClient.USER_AGENT_VERSION; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.times; + +/** + * Tests the AmazonSQSExtendedClient class. + */ +public class AmazonSQSExtendedClientTest { + + private SqsClient extendedSqsWithDefaultConfig; + private SqsClient extendedSqsWithCustomKMS; + private SqsClient extendedSqsWithDefaultKMS; + private SqsClient extendedSqsWithGenericReservedAttributeName; + private SqsClient extendedSqsWithDeprecatedMethods; + private SqsClient mockSqsBackend; + private S3Client mockS3; + private static final String S3_BUCKET_NAME = "test-bucket-name"; + private static final String SQS_QUEUE_URL = "test-queue-url"; + private static final String S3_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID = "test-customer-managed-kms-key-id"; + + private static final int LESS_THAN_SQS_SIZE_LIMIT = 3; + private static final int SQS_SIZE_LIMIT = 262144; + private static final int MORE_THAN_SQS_SIZE_LIMIT = SQS_SIZE_LIMIT + 1; + private static final ServerSideEncryptionStrategy SERVER_SIDE_ENCRYPTION_CUSTOM_STRATEGY = ServerSideEncryptionFactory.customerKey(S3_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID); + private static final ServerSideEncryptionStrategy SERVER_SIDE_ENCRYPTION_DEFAULT_STRATEGY = ServerSideEncryptionFactory.awsManagedCmk(); + + // should be > 1 and << SQS_SIZE_LIMIT + private static final int ARBITRARY_SMALLER_THRESHOLD = 500; + + @Before + public void setupClients() { + mockS3 = mock(S3Client.class); + mockSqsBackend = mock(SqsClient.class); + when(mockS3.putObject(isA(PutObjectRequest.class), isA(RequestBody.class))).thenReturn(null); + + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME); + + ExtendedClientConfiguration extendedClientConfigurationWithCustomKMS = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME) + .withServerSideEncryption(SERVER_SIDE_ENCRYPTION_CUSTOM_STRATEGY); + + ExtendedClientConfiguration extendedClientConfigurationWithDefaultKMS = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME) + .withServerSideEncryption(SERVER_SIDE_ENCRYPTION_DEFAULT_STRATEGY); + + ExtendedClientConfiguration extendedClientConfigurationWithGenericReservedAttributeName = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withLegacyReservedAttributeNameDisabled(); + + ExtendedClientConfiguration extendedClientConfigurationDeprecated = new ExtendedClientConfiguration() + .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME); + + extendedSqsWithDefaultConfig = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration)); + extendedSqsWithCustomKMS = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfigurationWithCustomKMS)); + extendedSqsWithDefaultKMS = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfigurationWithDefaultKMS)); + extendedSqsWithGenericReservedAttributeName = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfigurationWithGenericReservedAttributeName)); + extendedSqsWithDeprecatedMethods = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfigurationDeprecated)); + } + + @Test + public void testWhenSendMessageWithLargePayloadSupportDisabledThenS3IsNotUsedAndSqsBackendIsResponsibleToFailItWithDeprecatedMethod() { + int messageLength = MORE_THAN_SQS_SIZE_LIMIT; + String messageBody = generateStringWithLength(messageLength); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withLargePayloadSupportDisabled(); + SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration)); + + SendMessageRequest messageRequest = SendMessageRequest.builder() + .queueUrl(SQS_QUEUE_URL) + .messageBody(messageBody) + .overrideConfiguration( + AwsRequestOverrideConfiguration.builder() + .addApiName(ApiName.builder().name(USER_AGENT_NAME).version(USER_AGENT_VERSION).build()) + .build()) + .build(); + sqsExtended.sendMessage(messageRequest); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); + + verify(mockS3, never()).putObject(isA(PutObjectRequest.class), isA(RequestBody.class)); + verify(mockSqsBackend).sendMessage(argumentCaptor.capture()); + assertEquals(messageRequest.queueUrl(), argumentCaptor.getValue().queueUrl()); + assertEquals(messageRequest.messageBody(), argumentCaptor.getValue().messageBody()); + assertEquals(messageRequest.overrideConfiguration().get().apiNames().get(0).name(), argumentCaptor.getValue().overrideConfiguration().get().apiNames().get(0).name()); + assertEquals(messageRequest.overrideConfiguration().get().apiNames().get(0).version(), argumentCaptor.getValue().overrideConfiguration().get().apiNames().get(0).version()); + } + + @Test + public void testWhenSendMessageWithAlwaysThroughS3AndMessageIsSmallThenItIsStillStoredInS3WithDeprecatedMethod() { + int messageLength = LESS_THAN_SQS_SIZE_LIMIT; + String messageBody = generateStringWithLength(messageLength); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withAlwaysThroughS3(true); + SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mock(SqsClient.class), extendedClientConfiguration)); + + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + sqsExtended.sendMessage(messageRequest); + + verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class), isA(RequestBody.class)); + } + + @Test + public void testWhenSendMessageWithSetMessageSizeThresholdThenThresholdIsHonoredWithDeprecatedMethod() { + int messageLength = ARBITRARY_SMALLER_THRESHOLD * 2; + String messageBody = generateStringWithLength(messageLength); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withMessageSizeThreshold(ARBITRARY_SMALLER_THRESHOLD); + + SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mock(SqsClient.class), extendedClientConfiguration)); + + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + sqsExtended.sendMessage(messageRequest); + verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class), isA(RequestBody.class)); + } + + @Test + public void testReceiveMessageMultipleTimesDoesNotAdditionallyAlterReceiveMessageRequestWithDeprecatedMethod() { + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME); + SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration)); + when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(ReceiveMessageResponse.builder().build()); + + ReceiveMessageRequest messageRequest = ReceiveMessageRequest.builder().build(); + ReceiveMessageRequest expectedRequest = ReceiveMessageRequest.builder().build(); + + sqsExtended.receiveMessage(messageRequest); + assertEquals(expectedRequest, messageRequest); + + sqsExtended.receiveMessage(messageRequest); + assertEquals(expectedRequest, messageRequest); + } + + @Test + public void testWhenSendLargeMessageThenPayloadIsStoredInS3() { + String messageBody = generateStringWithLength(MORE_THAN_SQS_SIZE_LIMIT); + + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + extendedSqsWithDefaultConfig.sendMessage(messageRequest); + + verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class), isA(RequestBody.class)); + } + + @Test + public void testWhenSendLargeMessage_WithoutKMS_ThenPayloadIsStoredInS3AndKMSKeyIdIsNotUsed() { + String messageBody = generateStringWithLength(MORE_THAN_SQS_SIZE_LIMIT); + + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + extendedSqsWithDefaultConfig.sendMessage(messageRequest); + + ArgumentCaptor putObjectRequestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + ArgumentCaptor requestBodyArgumentCaptor = ArgumentCaptor.forClass(RequestBody.class); + verify(mockS3, times(1)).putObject(putObjectRequestArgumentCaptor.capture(), requestBodyArgumentCaptor.capture()); + + Assert.assertNull(putObjectRequestArgumentCaptor.getValue().serverSideEncryption()); + assertEquals(putObjectRequestArgumentCaptor.getValue().bucket(), S3_BUCKET_NAME); + } + + @Test + public void testWhenSendLargeMessage_WithCustomKMS_ThenPayloadIsStoredInS3AndCorrectKMSKeyIdIsNotUsed() { + String messageBody = generateStringWithLength(MORE_THAN_SQS_SIZE_LIMIT); + + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + extendedSqsWithCustomKMS.sendMessage(messageRequest); + + ArgumentCaptor putObjectRequestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + ArgumentCaptor requestBodyArgumentCaptor = ArgumentCaptor.forClass(RequestBody.class); + verify(mockS3, times(1)).putObject(putObjectRequestArgumentCaptor.capture(), requestBodyArgumentCaptor.capture()); + + assertEquals(putObjectRequestArgumentCaptor.getValue().ssekmsKeyId(), S3_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID); + assertEquals(putObjectRequestArgumentCaptor.getValue().bucket(), S3_BUCKET_NAME); + } + + @Test + public void testWhenSendLargeMessage_WithDefaultKMS_ThenPayloadIsStoredInS3AndCorrectKMSKeyIdIsNotUsed() { + String messageBody = generateStringWithLength(MORE_THAN_SQS_SIZE_LIMIT); + + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + extendedSqsWithDefaultKMS.sendMessage(messageRequest); + + ArgumentCaptor putObjectRequestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + ArgumentCaptor requestBodyArgumentCaptor = ArgumentCaptor.forClass(RequestBody.class); + verify(mockS3, times(1)).putObject(putObjectRequestArgumentCaptor.capture(), requestBodyArgumentCaptor.capture()); + + Assert.assertTrue(putObjectRequestArgumentCaptor.getValue().serverSideEncryption() != null && + putObjectRequestArgumentCaptor.getValue().ssekmsKeyId() == null); + assertEquals(putObjectRequestArgumentCaptor.getValue().bucket(), S3_BUCKET_NAME); + } + + @Test + public void testSendLargeMessageWithDefaultConfigThenLegacyReservedAttributeNameIsUsed(){ + String messageBody = generateStringWithLength(MORE_THAN_SQS_SIZE_LIMIT); + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + extendedSqsWithDefaultConfig.sendMessage(messageRequest); + + ArgumentCaptor sendMessageRequestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(mockSqsBackend).sendMessage(sendMessageRequestCaptor.capture()); + + Map attributes = sendMessageRequestCaptor.getValue().messageAttributes(); + Assert.assertTrue(attributes.containsKey(AmazonSQSExtendedClient.LEGACY_RESERVED_ATTRIBUTE_NAME)); + Assert.assertFalse(attributes.containsKey(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME)); + + } + + @Test + public void testSendLargeMessageWithGenericReservedAttributeNameConfigThenGenericReservedAttributeNameIsUsed(){ + String messageBody = generateStringWithLength(MORE_THAN_SQS_SIZE_LIMIT); + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + extendedSqsWithGenericReservedAttributeName.sendMessage(messageRequest); + + ArgumentCaptor sendMessageRequestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(mockSqsBackend).sendMessage(sendMessageRequestCaptor.capture()); + + Map attributes = sendMessageRequestCaptor.getValue().messageAttributes(); + Assert.assertTrue(attributes.containsKey(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME)); + Assert.assertFalse(attributes.containsKey(AmazonSQSExtendedClient.LEGACY_RESERVED_ATTRIBUTE_NAME)); + } + + @Test + public void testWhenSendSmallMessageThenS3IsNotUsed() { + String messageBody = generateStringWithLength(SQS_SIZE_LIMIT); + + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + extendedSqsWithDefaultConfig.sendMessage(messageRequest); + + verify(mockS3, never()).putObject(isA(PutObjectRequest.class), isA(RequestBody.class)); + } + + @Test + public void testWhenSendMessageWithLargePayloadSupportDisabledThenS3IsNotUsedAndSqsBackendIsResponsibleToFailIt() { + String messageBody = generateStringWithLength(MORE_THAN_SQS_SIZE_LIMIT); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportDisabled(); + SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration)); + + SendMessageRequest messageRequest = SendMessageRequest.builder() + .queueUrl(SQS_QUEUE_URL) + .messageBody(messageBody) + .overrideConfiguration( + AwsRequestOverrideConfiguration.builder() + .addApiName(ApiName.builder().name(USER_AGENT_NAME).version(USER_AGENT_VERSION).build()) + .build()) + .build(); + sqsExtended.sendMessage(messageRequest); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); + + verify(mockS3, never()).putObject(isA(PutObjectRequest.class), isA(RequestBody.class)); + verify(mockSqsBackend).sendMessage(argumentCaptor.capture()); + assertEquals(messageRequest.queueUrl(), argumentCaptor.getValue().queueUrl()); + assertEquals(messageRequest.messageBody(), argumentCaptor.getValue().messageBody()); + assertEquals(messageRequest.overrideConfiguration().get().apiNames().get(0).name(), argumentCaptor.getValue().overrideConfiguration().get().apiNames().get(0).name()); + assertEquals(messageRequest.overrideConfiguration().get().apiNames().get(0).version(), argumentCaptor.getValue().overrideConfiguration().get().apiNames().get(0).version()); + } + + @Test + public void testWhenSendMessageWithAlwaysThroughS3AndMessageIsSmallThenItIsStillStoredInS3() { + String messageBody = generateStringWithLength(LESS_THAN_SQS_SIZE_LIMIT); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withAlwaysThroughS3(true); + SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mock(SqsClient.class), extendedClientConfiguration)); + + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + sqsExtended.sendMessage(messageRequest); + + verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class), isA(RequestBody.class)); + } + + @Test + public void testWhenSendMessageWithSetMessageSizeThresholdThenThresholdIsHonored() { + int messageLength = ARBITRARY_SMALLER_THRESHOLD * 2; + String messageBody = generateStringWithLength(messageLength); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withPayloadSizeThreshold(ARBITRARY_SMALLER_THRESHOLD); + + SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mock(SqsClient.class), extendedClientConfiguration)); + + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + sqsExtended.sendMessage(messageRequest); + verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class), isA(RequestBody.class)); + } + + @Test + public void testReceiveMessageMultipleTimesDoesNotAdditionallyAlterReceiveMessageRequest() { + when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(ReceiveMessageResponse.builder().build()); + + ReceiveMessageRequest messageRequest = ReceiveMessageRequest.builder().build(); + + ReceiveMessageRequest expectedRequest = ReceiveMessageRequest.builder().build(); + + extendedSqsWithDefaultConfig.receiveMessage(messageRequest); + assertEquals(expectedRequest, messageRequest); + + extendedSqsWithDefaultConfig.receiveMessage(messageRequest); + assertEquals(expectedRequest, messageRequest); + } + + @Test + public void testReceiveMessage_when_MessageIsLarge_legacyReservedAttributeUsed() throws Exception { + testReceiveMessage_when_MessageIsLarge(AmazonSQSExtendedClient.LEGACY_RESERVED_ATTRIBUTE_NAME); + } + + @Test + public void testReceiveMessage_when_MessageIsLarge_ReservedAttributeUsed() throws Exception { + testReceiveMessage_when_MessageIsLarge(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME); + } + + @Test + public void testReceiveMessage_when_MessageIsSmall() throws Exception { + String expectedMessageAttributeName = "AnyMessageAttribute"; + String expectedMessage = "SmallMessage"; + Message message = Message.builder() + .messageAttributes(ImmutableMap.of(expectedMessageAttributeName, MessageAttributeValue.builder().build())) + .body(expectedMessage) + .build(); + when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(ReceiveMessageResponse.builder().messages(message).build()); + + ReceiveMessageRequest messageRequest = ReceiveMessageRequest.builder().build(); + ReceiveMessageResponse actualReceiveMessageResponse = extendedSqsWithDefaultConfig.receiveMessage(messageRequest); + Message actualMessage = actualReceiveMessageResponse.messages().get(0); + + assertEquals(expectedMessage, actualMessage.body()); + Assert.assertTrue(actualMessage.messageAttributes().containsKey(expectedMessageAttributeName)); + Assert.assertFalse(actualMessage.messageAttributes().keySet().containsAll(AmazonSQSExtendedClient.RESERVED_ATTRIBUTE_NAMES)); + verifyZeroInteractions(mockS3); + } + + @Test + public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStoredInS3() { + // This creates 10 messages, out of which only two are below the threshold (100K and 200K), + // and the other 8 are above the threshold + + int[] messageLengthForCounter = new int[] { + 100_000, + 300_000, + 400_000, + 500_000, + 600_000, + 700_000, + 800_000, + 900_000, + 200_000, + 1000_000 + }; + + List batchEntries = new ArrayList(); + for (int i = 0; i < 10; i++) { + int messageLength = messageLengthForCounter[i]; + String messageBody = generateStringWithLength(messageLength); + SendMessageBatchRequestEntry entry = SendMessageBatchRequestEntry.builder() + .id("entry_" + i) + .messageBody(messageBody) + .build(); + batchEntries.add(entry); + } + + SendMessageBatchRequest batchRequest = SendMessageBatchRequest.builder().queueUrl(SQS_QUEUE_URL).entries(batchEntries).build(); + extendedSqsWithDefaultConfig.sendMessageBatch(batchRequest); + + // There should be 8 puts for the 8 messages above the threshold + verify(mockS3, times(8)).putObject(isA(PutObjectRequest.class), isA(RequestBody.class)); + } + + @Test + public void testWhenMessageBatchIsLargeS3PointerIsCorrectlySentToSQSAndNotOriginalMessage() { + String messageBody = generateStringWithLength(LESS_THAN_SQS_SIZE_LIMIT); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withAlwaysThroughS3(true); + + SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration)); + + List batchEntries = new ArrayList(); + for (int i = 0; i < 10; i++) { + SendMessageBatchRequestEntry entry = SendMessageBatchRequestEntry.builder() + .id("entry_" + i) + .messageBody(messageBody) + .build(); + batchEntries.add(entry); + } + SendMessageBatchRequest batchRequest = SendMessageBatchRequest.builder().queueUrl(SQS_QUEUE_URL).entries(batchEntries).build(); + + sqsExtended.sendMessageBatch(batchRequest); + + ArgumentCaptor sendMessageRequestCaptor = ArgumentCaptor.forClass(SendMessageBatchRequest.class); + verify(mockSqsBackend).sendMessageBatch(sendMessageRequestCaptor.capture()); + + for (SendMessageBatchRequestEntry entry : sendMessageRequestCaptor.getValue().entries()) { + assertNotEquals(messageBody, entry.messageBody()); + } + } + + @Test + public void testWhenSmallMessageIsSentThenNoAttributeIsAdded() { + String messageBody = generateStringWithLength(LESS_THAN_SQS_SIZE_LIMIT); + + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + extendedSqsWithDefaultConfig.sendMessage(messageRequest); + + ArgumentCaptor sendMessageRequestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(mockSqsBackend).sendMessage(sendMessageRequestCaptor.capture()); + + Map attributes = sendMessageRequestCaptor.getValue().messageAttributes(); + Assert.assertTrue(attributes.isEmpty()); + } + + @Test + public void testWhenLargeMessageIsSentThenAttributeWithPayloadSizeIsAdded() { + int messageLength = MORE_THAN_SQS_SIZE_LIMIT; + String messageBody = generateStringWithLength(messageLength); + + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + extendedSqsWithDefaultConfig.sendMessage(messageRequest); + + ArgumentCaptor sendMessageRequestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(mockSqsBackend).sendMessage(sendMessageRequestCaptor.capture()); + + Map attributes = sendMessageRequestCaptor.getValue().messageAttributes(); + assertEquals("Number", attributes.get(AmazonSQSExtendedClient.LEGACY_RESERVED_ATTRIBUTE_NAME).dataType()); + assertEquals(messageLength, (int) Integer.parseInt(attributes.get(AmazonSQSExtendedClient.LEGACY_RESERVED_ATTRIBUTE_NAME).stringValue())); + } + + @Test + public void testDefaultExtendedClientDeletesSmallMessage() { + // given + String receiptHandle = UUID.randomUUID().toString(); + DeleteMessageRequest deleteRequest = DeleteMessageRequest.builder().queueUrl(SQS_QUEUE_URL).receiptHandle(receiptHandle).build(); + + // when + extendedSqsWithDefaultConfig.deleteMessage(deleteRequest); + + // then + ArgumentCaptor deleteRequestCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class); + verify(mockSqsBackend).deleteMessage(deleteRequestCaptor.capture()); + assertEquals(receiptHandle, deleteRequestCaptor.getValue().receiptHandle()); + verifyZeroInteractions(mockS3); + } + + @Test + public void testDefaultExtendedClientDeletesObjectS3UponMessageDelete() { + // given + String randomS3Key = UUID.randomUUID().toString(); + String originalReceiptHandle = UUID.randomUUID().toString(); + String largeMessageReceiptHandle = getLargeReceiptHandle(randomS3Key, originalReceiptHandle); + DeleteMessageRequest deleteRequest = DeleteMessageRequest.builder().queueUrl(SQS_QUEUE_URL).receiptHandle(largeMessageReceiptHandle).build(); + + // when + extendedSqsWithDefaultConfig.deleteMessage(deleteRequest); + + // then + ArgumentCaptor deleteRequestCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class); + verify(mockSqsBackend).deleteMessage(deleteRequestCaptor.capture()); + assertEquals(originalReceiptHandle, deleteRequestCaptor.getValue().receiptHandle()); + DeleteObjectRequest deleteObjectRequest = DeleteObjectRequest.builder().bucket(S3_BUCKET_NAME).key(randomS3Key).build(); + verify(mockS3).deleteObject(eq(deleteObjectRequest)); + } + + @Test + public void testExtendedClientConfiguredDoesNotDeleteObjectFromS3UponDelete() { + // given + String randomS3Key = UUID.randomUUID().toString(); + String originalReceiptHandle = UUID.randomUUID().toString(); + String largeMessageReceiptHandle = getLargeReceiptHandle(randomS3Key, originalReceiptHandle); + DeleteMessageRequest deleteRequest = DeleteMessageRequest.builder().queueUrl(SQS_QUEUE_URL).receiptHandle(largeMessageReceiptHandle).build(); + + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME, false); + + SqsClient extendedSqs = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration)); + + // when + extendedSqs.deleteMessage(deleteRequest); + + // then + ArgumentCaptor deleteRequestCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class); + verify(mockSqsBackend).deleteMessage(deleteRequestCaptor.capture()); + assertEquals(originalReceiptHandle, deleteRequestCaptor.getValue().receiptHandle()); + verifyZeroInteractions(mockS3); + } + + @Test + public void testExtendedClientConfiguredDoesNotDeletesObjectsFromS3UponDeleteBatch() { + // given + int batchSize = 10; + List originalReceiptHandles = IntStream.range(0, batchSize) + .mapToObj(i -> UUID.randomUUID().toString()) + .collect(Collectors.toList()); + DeleteMessageBatchRequest deleteBatchRequest = generateLargeDeleteBatchRequest(originalReceiptHandles); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME, false); + SqsClient extendedSqs = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration)); + + // when + extendedSqs.deleteMessageBatch(deleteBatchRequest); + + // then + ArgumentCaptor deleteBatchRequestCaptor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + verify(mockSqsBackend, times(1)).deleteMessageBatch(deleteBatchRequestCaptor.capture()); + DeleteMessageBatchRequest request = deleteBatchRequestCaptor.getValue(); + assertEquals(originalReceiptHandles.size(), request.entries().size()); + IntStream.range(0, originalReceiptHandles.size()).forEach(i -> assertEquals( + originalReceiptHandles.get(i), + request.entries().get(i).receiptHandle())); + verifyZeroInteractions(mockS3); + } + + @Test + public void testDefaultExtendedClientDeletesObjectsFromS3UponDeleteBatch() { + // given + int batchSize = 10; + List originalReceiptHandles = IntStream.range(0, batchSize) + .mapToObj(i -> UUID.randomUUID().toString()) + .collect(Collectors.toList()); + DeleteMessageBatchRequest deleteBatchRequest = generateLargeDeleteBatchRequest(originalReceiptHandles); + + // when + extendedSqsWithDefaultConfig.deleteMessageBatch(deleteBatchRequest); + + // then + ArgumentCaptor deleteBatchRequestCaptor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + verify(mockSqsBackend, times(1)).deleteMessageBatch(deleteBatchRequestCaptor.capture()); + DeleteMessageBatchRequest request = deleteBatchRequestCaptor.getValue(); + assertEquals(originalReceiptHandles.size(), request.entries().size()); + IntStream.range(0, originalReceiptHandles.size()).forEach(i -> assertEquals( + originalReceiptHandles.get(i), + request.entries().get(i).receiptHandle())); + verify(mockS3, times(batchSize)).deleteObject(any(DeleteObjectRequest.class)); + } + + @Test + public void testWhenSendMessageWIthCannedAccessControlListDefined() { + ObjectCannedACL expected = ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL; + String messageBody = generateStringWithLength(MORE_THAN_SQS_SIZE_LIMIT); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withObjectCannedACL(expected); + SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration)); + + SendMessageRequest messageRequest = SendMessageRequest.builder().queueUrl(SQS_QUEUE_URL).messageBody(messageBody).build(); + sqsExtended.sendMessage(messageRequest); + + ArgumentCaptor captor = ArgumentCaptor.forClass(PutObjectRequest.class); + + verify(mockS3).putObject(captor.capture(), any(RequestBody.class)); + + assertEquals(expected, captor.getValue().acl()); + } + + private void testReceiveMessage_when_MessageIsLarge(String reservedAttributeName) throws Exception { + String pointer = new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson(); + Message message = Message.builder() + .messageAttributes(ImmutableMap.of(reservedAttributeName, MessageAttributeValue.builder().build())) + .body(pointer) + .build(); + String expectedMessage = "LargeMessage"; + GetObjectRequest getObjectRequest = GetObjectRequest.builder() + .bucket(S3_BUCKET_NAME) + .key("S3Key") + .build(); + + ResponseInputStream s3Object = new ResponseInputStream(GetObjectResponse.builder().build(), AbortableInputStream.create(new StringInputStream(expectedMessage))); +// S3Object s3Object = S3Object.builder().build(); +// s3Object.setObjectContent(new StringInputStream(expectedMessage)); + when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn( + ReceiveMessageResponse.builder().messages(message).build()); + when(mockS3.getObject(isA(GetObjectRequest.class))).thenReturn(s3Object); + + ReceiveMessageRequest messageRequest = ReceiveMessageRequest.builder().build(); + ReceiveMessageResponse actualReceiveMessageResponse = extendedSqsWithDefaultConfig.receiveMessage(messageRequest); + Message actualMessage = actualReceiveMessageResponse.messages().get(0); + + assertEquals(expectedMessage, actualMessage.body()); + Assert.assertFalse(actualMessage.messageAttributes().keySet().containsAll(AmazonSQSExtendedClient.RESERVED_ATTRIBUTE_NAMES)); + verify(mockS3, times(1)).getObject(isA(GetObjectRequest.class)); + } + + private DeleteMessageBatchRequest generateLargeDeleteBatchRequest(List originalReceiptHandles) { + List deleteEntries = IntStream.range(0, originalReceiptHandles.size()) + .mapToObj(i -> DeleteMessageBatchRequestEntry.builder() + .id(Integer.toString(i)) + .receiptHandle(getSampleLargeReceiptHandle(originalReceiptHandles.get(i))) + .build()) + .collect(Collectors.toList()); + + return DeleteMessageBatchRequest.builder().queueUrl(SQS_QUEUE_URL).entries(deleteEntries).build(); + } + + private String getLargeReceiptHandle(String s3Key, String originalReceiptHandle) { + return SQSExtendedClientConstants.S3_BUCKET_NAME_MARKER + S3_BUCKET_NAME + + SQSExtendedClientConstants.S3_BUCKET_NAME_MARKER + SQSExtendedClientConstants.S3_KEY_MARKER + + s3Key + SQSExtendedClientConstants.S3_KEY_MARKER + originalReceiptHandle; + } + + private String getSampleLargeReceiptHandle(String originalReceiptHandle) { + return getLargeReceiptHandle(UUID.randomUUID().toString(), originalReceiptHandle); + } + + private String generateStringWithLength(int messageLength) { + char[] charArray = new char[messageLength]; + Arrays.fill(charArray, 'x'); + return new String(charArray); + } + +} diff --git a/demo/amazon-sqs-java-extended-client-lib/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java b/demo/amazon-sqs-java-extended-client-lib/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java new file mode 100644 index 0000000..df5e0c5 --- /dev/null +++ b/demo/amazon-sqs-java-extended-client-lib/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java @@ -0,0 +1,169 @@ +/* + * Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.sqs.javamessaging; + +import org.junit.Assert; +import org.junit.Test; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.payloadoffloading.ServerSideEncryptionFactory; +import software.amazon.payloadoffloading.ServerSideEncryptionStrategy; + +import static org.mockito.Mockito.*; + +/** + * Tests the ExtendedClientConfiguration class. + */ +public class ExtendedClientConfigurationTest { + + private static String s3BucketName = "test-bucket-name"; + private static String s3ServerSideEncryptionKMSKeyId = "test-customer-managed-kms-key-id"; + private static ServerSideEncryptionStrategy serverSideEncryptionStrategy = ServerSideEncryptionFactory.customerKey(s3ServerSideEncryptionKMSKeyId); + + @Test + public void testCopyConstructor() { + S3Client s3 = mock(S3Client.class); + + boolean alwaysThroughS3 = true; + int messageSizeThreshold = 500; + boolean doesCleanupS3Payload = false; + + ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration(); + + extendedClientConfig.withPayloadSupportEnabled(s3, s3BucketName, doesCleanupS3Payload) + .withAlwaysThroughS3(alwaysThroughS3).withPayloadSizeThreshold(messageSizeThreshold) + .withServerSideEncryption(serverSideEncryptionStrategy); + + ExtendedClientConfiguration newExtendedClientConfig = new ExtendedClientConfiguration(extendedClientConfig); + + Assert.assertEquals(s3, newExtendedClientConfig.getS3Client()); + Assert.assertEquals(s3BucketName, newExtendedClientConfig.getS3BucketName()); + Assert.assertEquals(serverSideEncryptionStrategy, newExtendedClientConfig.getServerSideEncryptionStrategy()); + Assert.assertTrue(newExtendedClientConfig.isPayloadSupportEnabled()); + Assert.assertEquals(doesCleanupS3Payload, newExtendedClientConfig.doesCleanupS3Payload()); + Assert.assertEquals(alwaysThroughS3, newExtendedClientConfig.isAlwaysThroughS3()); + Assert.assertEquals(messageSizeThreshold, newExtendedClientConfig.getPayloadSizeThreshold()); + + Assert.assertNotSame(newExtendedClientConfig, extendedClientConfig); + } + + @Test + public void testLargePayloadSupportEnabledWithDefaultDeleteFromS3Config() { + S3Client s3 = mock(S3Client.class); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration(); + extendedClientConfiguration.setPayloadSupportEnabled(s3, s3BucketName); + + Assert.assertTrue(extendedClientConfiguration.isPayloadSupportEnabled()); + Assert.assertTrue(extendedClientConfiguration.doesCleanupS3Payload()); + Assert.assertNotNull(extendedClientConfiguration.getS3Client()); + Assert.assertEquals(s3BucketName, extendedClientConfiguration.getS3BucketName()); + + } + + @Test + public void testLargePayloadSupportEnabledWithDeleteFromS3Enabled() { + + S3Client s3 = mock(S3Client.class); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration(); + extendedClientConfiguration.setPayloadSupportEnabled(s3, s3BucketName, true); + + Assert.assertTrue(extendedClientConfiguration.isPayloadSupportEnabled()); + Assert.assertTrue(extendedClientConfiguration.doesCleanupS3Payload()); + Assert.assertNotNull(extendedClientConfiguration.getS3Client()); + Assert.assertEquals(s3BucketName, extendedClientConfiguration.getS3BucketName()); + } + + @Test + public void testLargePayloadSupportEnabledWithDeleteFromS3Disabled() { + S3Client s3 = mock(S3Client.class); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration(); + extendedClientConfiguration.setPayloadSupportEnabled(s3, s3BucketName, false); + + Assert.assertTrue(extendedClientConfiguration.isPayloadSupportEnabled()); + Assert.assertFalse(extendedClientConfiguration.doesCleanupS3Payload()); + Assert.assertNotNull(extendedClientConfiguration.getS3Client()); + Assert.assertEquals(s3BucketName, extendedClientConfiguration.getS3BucketName()); + } + + @Test + public void testCopyConstructorDeprecated() { + + S3Client s3 = mock(S3Client.class); +// when(s3.putObject(isA(PutObjectRequest.class))).thenReturn(null); + + boolean alwaysThroughS3 = true; + int messageSizeThreshold = 500; + + ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration(); + + extendedClientConfig.withLargePayloadSupportEnabled(s3, s3BucketName) + .withAlwaysThroughS3(alwaysThroughS3).withMessageSizeThreshold(messageSizeThreshold); + + ExtendedClientConfiguration newExtendedClientConfig = new ExtendedClientConfiguration(extendedClientConfig); + + Assert.assertEquals(s3, newExtendedClientConfig.getS3Client()); + Assert.assertEquals(s3BucketName, newExtendedClientConfig.getS3BucketName()); + Assert.assertTrue(newExtendedClientConfig.isLargePayloadSupportEnabled()); + Assert.assertEquals(alwaysThroughS3, newExtendedClientConfig.isAlwaysThroughS3()); + Assert.assertEquals(messageSizeThreshold, newExtendedClientConfig.getMessageSizeThreshold()); + + Assert.assertNotSame(newExtendedClientConfig, extendedClientConfig); + } + + @Test + public void testLargePayloadSupportEnabled() { + + S3Client s3 = mock(S3Client.class); +// when(s3.putObject(isA(PutObjectRequest.class))).thenReturn(null); + + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration(); + extendedClientConfiguration.setLargePayloadSupportEnabled(s3, s3BucketName); + + Assert.assertTrue(extendedClientConfiguration.isLargePayloadSupportEnabled()); + Assert.assertNotNull(extendedClientConfiguration.getS3Client()); + Assert.assertEquals(s3BucketName, extendedClientConfiguration.getS3BucketName()); + + } + + @Test + public void testDisableLargePayloadSupport() { + + S3Client s3 = mock(S3Client.class); +// when(s3.putObject(isA(PutObjectRequest.class))).thenReturn(null); + + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration(); + extendedClientConfiguration.setLargePayloadSupportDisabled(); + + Assert.assertNull(extendedClientConfiguration.getS3Client()); + Assert.assertNull(extendedClientConfiguration.getS3BucketName()); + +// verify(s3, never()).putObject(isA(PutObjectRequest.class)); + } + + @Test + public void testMessageSizeThreshold() { + + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration(); + + Assert.assertEquals(SQSExtendedClientConstants.DEFAULT_MESSAGE_SIZE_THRESHOLD, + extendedClientConfiguration.getMessageSizeThreshold()); + + int messageLength = 1000; + extendedClientConfiguration.setMessageSizeThreshold(messageLength); + Assert.assertEquals(messageLength, extendedClientConfiguration.getMessageSizeThreshold()); + + } +} diff --git a/demo/clean.sh b/demo/clean.sh new file mode 100644 index 0000000..4aca0b0 --- /dev/null +++ b/demo/clean.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +function clean { + rm -rf inline-tests + rm -rf amazon-sqs-java-extended-client-lib/target + rm -rf amazon-sqs-java-extended-client-lib/deps.txt + ( + cd amazon-sqs-java-extended-client-lib + mvn clean + ) + rm -rf out.txt + rm -rf .DS_Store +} + +clean &> /dev/null diff --git a/demo/diff.txt b/demo/diff.txt new file mode 100644 index 0000000..f9a0047 --- /dev/null +++ b/demo/diff.txt @@ -0,0 +1,38 @@ +diff --git a/pom.xml b/pom.xml +index 7c4fb2a..53a9ce2 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -77,6 +77,11 @@ + 3.12.4 + test + ++ ++ org.inlinetest ++ inlinetest ++ 1.0 ++ + + + +diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java +index 7520706..1fdd5f4 100644 +--- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java ++++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java +@@ -76,6 +76,9 @@ import software.amazon.payloadoffloading.S3BackedPayloadStore; + import software.amazon.payloadoffloading.S3Dao; + import software.amazon.payloadoffloading.Util; + ++import org.inlinetest.ITest; ++import static org.inlinetest.ITest.itest; ++import static org.inlinetest.ITest.group; + + /** + * Amazon SQS Extended Client extends the functionality of Amazon SQS client. +@@ -883,6 +886,7 @@ public class AmazonSQSExtendedClient extends AmazonSQSExtendedClientBase impleme + private String getOrigReceiptHandle(String receiptHandle) { + int secondOccurence = receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER, + receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER) + 1); ++ itest().given(receiptHandle, "1FOc=-..s3Key..-y*-@T-..s3Key..-").checkEq(secondOccurence, 21); + return receiptHandle.substring(secondOccurence + SQSExtendedClientConstants.S3_KEY_MARKER.length()); + } + diff --git a/demo/inlinetest-1.0.jar b/demo/inlinetest-1.0.jar new file mode 100644 index 0000000..9a59c65 Binary files /dev/null and b/demo/inlinetest-1.0.jar differ diff --git a/demo/junit-platform-console-standalone-1.12.0.jar b/demo/junit-platform-console-standalone-1.12.0.jar new file mode 100644 index 0000000..91d6b3e Binary files /dev/null and b/demo/junit-platform-console-standalone-1.12.0.jar differ diff --git a/demo/run.sh b/demo/run.sh new file mode 100644 index 0000000..2c1b0c4 --- /dev/null +++ b/demo/run.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +ASSERTION_STYLE=$1 + +PROJECT_NAME="amazon-sqs-java-extended-client-lib" +ITEST_JAR="inlinetest-1.0.jar" +JUNIT_JAR="junit-platform-console-standalone-1.12.0.jar" + +function install_inlinetest { + echo "===== Installing inlinetest =====" + mvn install:install-file -Dfile=${ITEST_JAR} -DgroupId=org.inlinetest -DartifactId=inlinetest -Dversion=1.0 -Dpackaging=jar --no-transfer-progress +} + +function get_dependencies_and_compile { + echo "===== Collecting dependencies and compiling =====" + ( + cd "${PROJECT_NAME}" + mvn dependency:build-classpath -Dmdep.outputFile=deps.txt --no-transfer-progress + mvn clean compile --no-transfer-progress + ) +} + +function parse_inline_test { + echo "===== Parsing inline test =====" + local output_dir="inline-tests/src" + mkdir -p "${output_dir}" + java -cp "${ITEST_JAR}" org.inlinetest.InlineTestRunnerSourceCode --input_file=${PROJECT_NAME}/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java --assertion_style="${ASSERTION_STYLE}" --output_dir="${output_dir}" --multiple_test_classes=true --dep_file_path=${PROJECT_NAME}/deps.txt --app_src_path=${PROJECT_NAME}/src/main/java +} + +function compile_inline_test { + echo "===== Compiling inline test =====" + local input_dir="inline-tests/src" + local output_dir="inline-tests/bin" + mkdir -p "${output_dir}" + javac -cp "$(cat ${PROJECT_NAME}/deps.txt)":${ITEST_JAR}:${input_dir}:${PROJECT_NAME}/target/classes -d "${output_dir}" ${input_dir}/*.java +} + +function execute_inline_test { + echo "===== Executing inline test =====" + if [ "${ASSERTION_STYLE}" == "junit" ]; then + java -jar "${JUNIT_JAR}" -cp "$(cat ${PROJECT_NAME}/deps.txt | sed 's|target/test-classes||g')":inline-tests/bin --select-package com.amazon.sqs.javamessaging + elif [ "${ASSERTION_STYLE}" == "assert" ]; then + java -cp "${PROJECT_NAME}/target/classes:inline-tests/bin" com.amazon.sqs.javamessaging.AmazonSQSExtendedClient_0Test + else + echo "Invalid assertion style: ${ASSERTION_STYLE}" + exit 1 + fi +} + +function main { + install_inlinetest + get_dependencies_and_compile + parse_inline_test + compile_inline_test + execute_inline_test +} + +main &> out.txt