Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions awss3-storage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ dependencies {

implementation "com.amazonaws:aws-java-sdk-s3:${revAwsSdk}"
implementation "org.apache.commons:commons-lang3"
implementation 'software.amazon.awssdk:s3:2.20.146'
implementation 'software.amazon.awssdk:sts:2.20.146'
implementation 'com.amazonaws:aws-java-sdk-sts:1.12.782'
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.s3.storage.S3PayloadStorage;

import com.amazonaws.auth.WebIdentityTokenCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;

Expand All @@ -41,6 +42,11 @@ public ExternalPayloadStorage s3ExternalPayloadStorage(
matchIfMissing = true)
@Bean
public AmazonS3 amazonS3(S3Properties properties) {
if (properties.getWebIdentity()) {
return AmazonS3ClientBuilder.standard()
.withCredentials(WebIdentityTokenCredentialsProvider.builder().build())
.build();
}
return AmazonS3ClientBuilder.standard().withRegion(properties.getRegion()).build();
// TODO: Add localstack support to test locally
// return AmazonS3ClientBuilder.standard()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class S3Properties {
/** The s3 bucket name where the payloads will be stored */
private String bucketName = "conductor_payloads";

private Boolean webIdentity = false;

/** The time (in seconds) for which the signed url will be valid */
@DurationUnit(ChronoUnit.SECONDS)
private Duration signedUrlExpirationDuration = Duration.ofSeconds(5);
Expand Down Expand Up @@ -55,6 +57,9 @@ public void setRegion(String region) {
this.region = region;
}

public Boolean getWebIdentity() {
return webIdentity;
}
// TODO: Add localstack support to test locally
// private String endpoint = "http://s3.localhost.localstack.cloud:4566";
//
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
CREATE OR REPLACE PROCEDURE public.conductor_archive(IN archival_date date)
LANGUAGE plpgsql
AS $procedure$
BEGIN

--CREATING TEMP TABLE FOR TASK IDs
CREATE TEMP TABLE temp_task_ids ON COMMIT DROP AS
SELECT task_id FROM task WHERE created_on < archival_date;

ALTER TABLE temp_task_ids ADD PRIMARY KEY (task_id);
ANALYZE temp_task_ids;

--CREATING TEMP TABLE FOR WORKFLOW IDs
CREATE TEMP TABLE temp_workflow_ids ON COMMIT DROP AS
SELECT workflow_id FROM workflow WHERE created_on < archival_date;

ALTER TABLE temp_workflow_ids ADD PRIMARY KEY (workflow_id);
ANALYZE temp_workflow_ids;

--CREATING TEMP TABLE FOR temp_workflow_def_to_workflow IDs
CREATE TEMP TABLE temp_workflow_def_to_workflow_ids ON COMMIT DROP AS
SELECT wdt.workflow_id
FROM workflow_def_to_workflow wdt
JOIN temp_workflow_ids tw ON tw.workflow_id = wdt.workflow_id;

ALTER TABLE temp_workflow_def_to_workflow_ids ADD PRIMARY KEY (workflow_id);
ANALYZE temp_workflow_def_to_workflow_ids;

--CREATING TEMP TABLES FOR workflow_to_task IDs
CREATE TEMP TABLE temp_workflow_to_task_ids ON COMMIT DROP AS
SELECT w.task_id
FROM workflow_to_task w
JOIN temp_task_ids t ON t.task_id = w.task_id;

ALTER TABLE temp_workflow_to_task_ids ADD PRIMARY KEY(task_id);
ANALYZE temp_workflow_to_task_ids;

--CREATING TEMP TABLES FOR task_scheduled IDs
CREATE TEMP TABLE temp_task_scheduled_ids ON COMMIT DROP AS
SELECT ts.task_id
FROM task_scheduled ts
JOIN temp_task_ids t ON t.task_id = ts.task_id;

ALTER TABLE temp_task_scheduled_ids ADD PRIMARY KEY(task_id);
ANALYZE temp_task_scheduled_ids;

DELETE FROM task t USING temp_task_ids tti WHERE t.task_id = tti.task_id;
DELETE FROM workflow w USING temp_workflow_ids t WHERE w.workflow_id = t.workflow_id;
DELETE FROM workflow_def_to_workflow w USING temp_workflow_def_to_workflow_ids t WHERE w.workflow_id = t.workflow_id;
DELETE FROM workflow_to_task w USING temp_workflow_to_task_ids t WHERE w.task_id = t.task_id;
DELETE FROM task_scheduled t USING temp_task_scheduled_ids tts WHERE t.task_id = tts.task_id;
DELETE FROM event_execution WHERE created_on < archival_date;
END;
$procedure$
;
Loading