diff --git a/awss3-storage/build.gradle b/awss3-storage/build.gradle index 57e9d4fc3a..350ae8bd97 100644 --- a/awss3-storage/build.gradle +++ b/awss3-storage/build.gradle @@ -18,4 +18,7 @@ dependencies { implementation "com.amazonaws:aws-java-sdk-s3:${revAwsSdk}" implementation "org.apache.commons:commons-lang3" + implementation 'software.amazon.awssdk:s3:2.20.146' + implementation 'software.amazon.awssdk:sts:2.20.146' + implementation 'com.amazonaws:aws-java-sdk-sts:1.12.782' } diff --git a/awss3-storage/src/main/java/com/netflix/conductor/s3/config/S3Configuration.java b/awss3-storage/src/main/java/com/netflix/conductor/s3/config/S3Configuration.java index 106f61664d..44dc8a43ed 100644 --- a/awss3-storage/src/main/java/com/netflix/conductor/s3/config/S3Configuration.java +++ b/awss3-storage/src/main/java/com/netflix/conductor/s3/config/S3Configuration.java @@ -21,6 +21,7 @@ import com.netflix.conductor.core.utils.IDGenerator; import com.netflix.conductor.s3.storage.S3PayloadStorage; +import com.amazonaws.auth.WebIdentityTokenCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; @@ -41,6 +42,11 @@ public ExternalPayloadStorage s3ExternalPayloadStorage( matchIfMissing = true) @Bean public AmazonS3 amazonS3(S3Properties properties) { + if (properties.getWebIdentity()) { + return AmazonS3ClientBuilder.standard() + .withCredentials(WebIdentityTokenCredentialsProvider.builder().build()) + .build(); + } return AmazonS3ClientBuilder.standard().withRegion(properties.getRegion()).build(); // TODO: Add localstack support to test locally // return AmazonS3ClientBuilder.standard() diff --git a/awss3-storage/src/main/java/com/netflix/conductor/s3/config/S3Properties.java b/awss3-storage/src/main/java/com/netflix/conductor/s3/config/S3Properties.java index f399cd0d7d..df28f94c30 100644 --- a/awss3-storage/src/main/java/com/netflix/conductor/s3/config/S3Properties.java +++ b/awss3-storage/src/main/java/com/netflix/conductor/s3/config/S3Properties.java @@ -24,6 +24,8 @@ public class S3Properties { /** The s3 bucket name where the payloads will be stored */ private String bucketName = "conductor_payloads"; + private Boolean webIdentity = false; + /** The time (in seconds) for which the signed url will be valid */ @DurationUnit(ChronoUnit.SECONDS) private Duration signedUrlExpirationDuration = Duration.ofSeconds(5); @@ -55,6 +57,9 @@ public void setRegion(String region) { this.region = region; } + public Boolean getWebIdentity() { + return webIdentity; + } // TODO: Add localstack support to test locally // private String endpoint = "http://s3.localhost.localstack.cloud:4566"; // diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V9__archival_procedure.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V9__archival_procedure.sql new file mode 100644 index 0000000000..83ae483c42 --- /dev/null +++ b/postgres-persistence/src/main/resources/db/migration_postgres/V9__archival_procedure.sql @@ -0,0 +1,55 @@ +CREATE OR REPLACE PROCEDURE public.conductor_archive(IN archival_date date) + LANGUAGE plpgsql +AS $procedure$ +BEGIN + + --CREATING TEMP TABLE FOR TASK IDs + CREATE TEMP TABLE temp_task_ids ON COMMIT DROP AS + SELECT task_id FROM task WHERE created_on < archival_date; + + ALTER TABLE temp_task_ids ADD PRIMARY KEY (task_id); + ANALYZE temp_task_ids; + + --CREATING TEMP TABLE FOR WORKFLOW IDs + CREATE TEMP TABLE temp_workflow_ids ON COMMIT DROP AS + SELECT workflow_id FROM workflow WHERE created_on < archival_date; + + ALTER TABLE temp_workflow_ids ADD PRIMARY KEY (workflow_id); + ANALYZE temp_workflow_ids; + + --CREATING TEMP TABLE FOR temp_workflow_def_to_workflow IDs + CREATE TEMP TABLE temp_workflow_def_to_workflow_ids ON COMMIT DROP AS + SELECT wdt.workflow_id + FROM workflow_def_to_workflow wdt + JOIN temp_workflow_ids tw ON tw.workflow_id = wdt.workflow_id; + + ALTER TABLE temp_workflow_def_to_workflow_ids ADD PRIMARY KEY (workflow_id); + ANALYZE temp_workflow_def_to_workflow_ids; + + --CREATING TEMP TABLES FOR workflow_to_task IDs + CREATE TEMP TABLE temp_workflow_to_task_ids ON COMMIT DROP AS + SELECT w.task_id + FROM workflow_to_task w + JOIN temp_task_ids t ON t.task_id = w.task_id; + + ALTER TABLE temp_workflow_to_task_ids ADD PRIMARY KEY(task_id); + ANALYZE temp_workflow_to_task_ids; + + --CREATING TEMP TABLES FOR task_scheduled IDs + CREATE TEMP TABLE temp_task_scheduled_ids ON COMMIT DROP AS + SELECT ts.task_id + FROM task_scheduled ts + JOIN temp_task_ids t ON t.task_id = ts.task_id; + + ALTER TABLE temp_task_scheduled_ids ADD PRIMARY KEY(task_id); + ANALYZE temp_task_scheduled_ids; + + DELETE FROM task t USING temp_task_ids tti WHERE t.task_id = tti.task_id; + DELETE FROM workflow w USING temp_workflow_ids t WHERE w.workflow_id = t.workflow_id; + DELETE FROM workflow_def_to_workflow w USING temp_workflow_def_to_workflow_ids t WHERE w.workflow_id = t.workflow_id; + DELETE FROM workflow_to_task w USING temp_workflow_to_task_ids t WHERE w.task_id = t.task_id; + DELETE FROM task_scheduled t USING temp_task_scheduled_ids tts WHERE t.task_id = tts.task_id; + DELETE FROM event_execution WHERE created_on < archival_date; +END; +$procedure$ +;