Skip to content

Commit eeaf390

Browse files
authored
feat(intake): add waiters for Intake Runner, Intake and Intake User (#3655)
1 parent 13249a5 commit eeaf390

File tree

10 files changed

+799
-1
lines changed

10 files changed

+799
-1
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
- `postgresflex`: [v1.3.0](services/postgresflex/CHANGELOG.md#v130)
3030
- **Breaking Change:** The attribute type for `PartialUpdateInstancePayload` and `UpdateInstancePayload` changed from `Storage` to `StorageUpdate`.
3131
- **Deprecation:** `StorageUpdate`: updating the performance class field is not possible.
32+
- `intake`: [v0.3.0](services/intake/CHANGELOG.md#v030)
33+
- **Feature:** Add wait handlers for `Intake`, `IntakeRunner`, and `IntakeUser` resources.
34+
- **Improvement:** Add usage examples for the `intake` service.
35+
3236
- `iaas`:
3337
- [v1.0.1](services/iaas/CHANGELOG.md#v101)
3438
- Bump STACKIT resourcemanager SDK module from `v0.17.1` to `v0.18.0`

examples/intake/go.mod

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
module github.com/stackitcloud/stackit-sdk-go/examples/intake
2+
3+
go 1.21
4+
5+
require (
6+
github.com/stackitcloud/stackit-sdk-go/core v0.17.3
7+
github.com/stackitcloud/stackit-sdk-go/services/intake v0.2.0
8+
)
9+
10+
require (
11+
github.com/golang-jwt/jwt/v5 v5.3.0 // indirect
12+
github.com/google/uuid v1.6.0 // indirect
13+
)

examples/intake/go.sum

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
2+
github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
3+
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
4+
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
5+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
6+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
7+
github.com/stackitcloud/stackit-sdk-go/core v0.17.3 h1:GsZGmRRc/3GJLmCUnsZswirr5wfLRrwavbnL/renOqg=
8+
github.com/stackitcloud/stackit-sdk-go/core v0.17.3/go.mod h1:HBCXJGPgdRulplDzhrmwC+Dak9B/x0nzNtmOpu+1Ahg=
9+
github.com/stackitcloud/stackit-sdk-go/services/intake v0.2.0 h1:p/zi4VPoCQWk7/2ubi3hxsqiaye41x/Pl3GXYbPkYOY=
10+
github.com/stackitcloud/stackit-sdk-go/services/intake v0.2.0/go.mod h1:jOArPjNRkwv4487+9ab3dRG+lM09leu5FiRohbQs9Z4=

examples/intake/intake.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
8+
sdkConfig "github.com/stackitcloud/stackit-sdk-go/core/config"
9+
"github.com/stackitcloud/stackit-sdk-go/core/utils"
10+
"github.com/stackitcloud/stackit-sdk-go/services/intake"
11+
)
12+
13+
func main() {
14+
region := "eu01" // Region where the resources will be created
15+
projectId := "PROJECT_ID" // Your STACKIT project ID
16+
17+
dremioCatalogURI := "DREMIO_CATALOG_URI" //nolint:gosec // E.g., "https://my-dremio-catalog.data-platform.stackit.run/iceberg"
18+
dremioTokenEndpoint := "DREMIO_TOKEN_ENDPOINT" //nolint:gosec // E.g., "https://my-dremio.data-platform.stackit.run/oauth/token"
19+
dremioPAT := "DREMIO_PERSONAL_ACCESS_TOKEN" //nolint:gosec // Your Dremio Personal Access Token
20+
catalogWarehouse := "CATALOG_WAREHOUSE" //nolint:gosec // Catalog warehouse where the data will be ingested
21+
22+
intakeUserPassword := "s3cuRe_p@ssW0rd_f0r_1ntake!" //nolint:gosec // A secure password for the new intake user
23+
24+
ctx := context.Background()
25+
26+
intakeClient, err := intake.NewAPIClient(
27+
sdkConfig.WithRegion(region),
28+
)
29+
if err != nil {
30+
fmt.Fprintf(os.Stderr, "Creating API client: %v\n", err)
31+
os.Exit(1)
32+
}
33+
34+
// Create an Intake Runner
35+
createRunnerPayload := intake.CreateIntakeRunnerPayload{
36+
DisplayName: utils.Ptr("my-example-runner"),
37+
MaxMessageSizeKiB: utils.Ptr(int64(10)),
38+
MaxMessagesPerHour: utils.Ptr(int64(1000)),
39+
}
40+
createRunnerResp, err := intakeClient.CreateIntakeRunner(ctx, projectId, region).CreateIntakeRunnerPayload(createRunnerPayload).Execute()
41+
if err != nil {
42+
fmt.Fprintf(os.Stderr, "Error creating Intake Runner: %v\n", err)
43+
os.Exit(1)
44+
}
45+
intakeRunnerId := *createRunnerResp.Id
46+
fmt.Printf("Triggered creation of Intake Runner with ID: %s. Waiting for it to become active...\n", intakeRunnerId)
47+
48+
// Create an Intake
49+
dremioAuthType := intake.CatalogAuthType("dremio") // can also be set to "none" if the catalog is not authenticated
50+
createIntakePayload := intake.CreateIntakePayload{
51+
DisplayName: utils.Ptr("my-example-intake"),
52+
IntakeRunnerId: utils.Ptr(intakeRunnerId),
53+
Catalog: &intake.IntakeCatalog{
54+
Uri: utils.Ptr(dremioCatalogURI),
55+
Warehouse: utils.Ptr(catalogWarehouse),
56+
Namespace: utils.Ptr("example_namespace"),
57+
TableName: utils.Ptr("example_table"),
58+
Auth: &intake.CatalogAuth{
59+
Type: &dremioAuthType,
60+
Dremio: &intake.DremioAuth{
61+
TokenEndpoint: utils.Ptr(dremioTokenEndpoint),
62+
PersonalAccessToken: utils.Ptr(dremioPAT),
63+
},
64+
},
65+
},
66+
}
67+
createIntakeResp, err := intakeClient.CreateIntake(ctx, projectId, region).CreateIntakePayload(createIntakePayload).Execute()
68+
if err != nil {
69+
fmt.Fprintf(os.Stderr, "Error creating Intake: %v\n", err)
70+
os.Exit(1)
71+
}
72+
intakeId := *createIntakeResp.Id
73+
fmt.Printf("Triggered creation of Intake with ID: %s. Waiting for it to become active...\n", intakeRunnerId)
74+
75+
createIntakeUserPayload := intake.CreateIntakeUserPayload{
76+
DisplayName: utils.Ptr("my-example-user"),
77+
Password: utils.Ptr(intakeUserPassword),
78+
}
79+
// Create an Intake user
80+
createIntakeUserResp, err := intakeClient.CreateIntakeUser(ctx, projectId, region, intakeId).CreateIntakeUserPayload(createIntakeUserPayload).Execute()
81+
if err != nil {
82+
fmt.Fprintf(os.Stderr, "Error creating Intake User: %v\n", err)
83+
os.Exit(1)
84+
}
85+
intakeUserId := *createIntakeUserResp.Id
86+
fmt.Printf("Created Intake User with ID: %s\n", intakeUserId)
87+
}

go.work

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use (
1010
./examples/dns
1111
./examples/errorhandling
1212
./examples/iaas
13+
./examples/intake
1314
./examples/kms
1415
./examples/loadbalancer
1516
./examples/logme

services/intake/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## v0.3.0
2+
- **Feature:** Add wait handlers for `Intake`, `IntakeRunner`, and `IntakeUser` resources.
3+
- **Improvement:** Add usage examples for the `intake` service.
4+
15
## v0.2.0
26
- **Feature:** Add response `IntakeRunnerResponse` to `UpdateIntakeRunnerExecute` request
37
- **Feature:** Add response `IntakeUserResponse` to `UpdateIntakeUserExecute` request

services/intake/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v0.2.0
1+
v0.3.0

services/intake/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/stackitcloud/stackit-sdk-go/services/intake
33
go 1.21
44

55
require (
6+
github.com/google/go-cmp v0.7.0
67
github.com/google/uuid v1.6.0
78
github.com/stackitcloud/stackit-sdk-go/core v0.19.0
89
)

services/intake/wait/wait.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package wait
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net/http"
8+
"time"
9+
10+
"github.com/stackitcloud/stackit-sdk-go/core/oapierror"
11+
"github.com/stackitcloud/stackit-sdk-go/core/wait"
12+
"github.com/stackitcloud/stackit-sdk-go/services/intake"
13+
)
14+
15+
type APIClientInterface interface {
16+
GetIntakeRunnerExecute(ctx context.Context, projectId, region, intakeRunnerId string) (*intake.IntakeRunnerResponse, error)
17+
GetIntakeExecute(ctx context.Context, projectId, region, intakeId string) (*intake.IntakeResponse, error)
18+
GetIntakeUserExecute(ctx context.Context, projectId, region, intakeId, intakeUserId string) (*intake.IntakeUserResponse, error)
19+
}
20+
21+
func CreateOrUpdateIntakeRunnerWaitHandler(ctx context.Context, a APIClientInterface, projectId, region, intakeRunnerId string) *wait.AsyncActionHandler[intake.IntakeRunnerResponse] {
22+
handler := wait.New(func() (waitFinished bool, response *intake.IntakeRunnerResponse, err error) {
23+
runner, err := a.GetIntakeRunnerExecute(ctx, projectId, region, intakeRunnerId)
24+
if err != nil {
25+
return false, nil, err
26+
}
27+
28+
if runner == nil {
29+
return false, nil, fmt.Errorf("API returned a nil response for Intake Runner %s", intakeRunnerId)
30+
}
31+
32+
if runner.Id == nil || runner.State == nil {
33+
return false, nil, fmt.Errorf("could not get ID or State from response for Intake Runner %s", intakeRunnerId)
34+
}
35+
36+
if *runner.Id == intakeRunnerId && *runner.State == intake.INTAKERUNNERRESPONSESTATE_ACTIVE {
37+
return true, runner, nil
38+
}
39+
40+
// The API does not have a dedicated failure state for this resource,
41+
// so we rely on the timeout for cases where it never becomes active.
42+
return false, nil, nil
43+
})
44+
handler.SetTimeout(15 * time.Minute)
45+
return handler
46+
}
47+
48+
func DeleteIntakeRunnerWaitHandler(ctx context.Context, a APIClientInterface, projectId, region, intakeRunnerId string) *wait.AsyncActionHandler[intake.IntakeRunnerResponse] {
49+
handler := wait.New(func() (waitFinished bool, response *intake.IntakeRunnerResponse, err error) {
50+
_, err = a.GetIntakeRunnerExecute(ctx, projectId, region, intakeRunnerId)
51+
if err == nil {
52+
// Resource still exists
53+
return false, nil, nil
54+
}
55+
56+
var oapiError *oapierror.GenericOpenAPIError
57+
if errors.As(err, &oapiError) {
58+
if oapiError.StatusCode == http.StatusNotFound {
59+
// Success: Resource is gone
60+
return true, nil, nil
61+
}
62+
}
63+
// An unexpected error occurred
64+
return false, nil, err
65+
})
66+
handler.SetTimeout(15 * time.Minute)
67+
return handler
68+
}
69+
70+
func CreateOrUpdateIntakeWaitHandler(ctx context.Context, a APIClientInterface, projectId, region, intakeId string) *wait.AsyncActionHandler[intake.IntakeResponse] {
71+
handler := wait.New(func() (waitFinished bool, response *intake.IntakeResponse, err error) {
72+
ik, err := a.GetIntakeExecute(ctx, projectId, region, intakeId)
73+
if err != nil {
74+
return false, nil, err
75+
}
76+
77+
if ik == nil {
78+
return false, nil, fmt.Errorf("API returned a nil response for Intake %s", intakeId)
79+
}
80+
81+
if ik.Id == nil || ik.State == nil {
82+
return false, nil, fmt.Errorf("could not get ID or State from response for Intake %s", intakeId)
83+
}
84+
85+
if *ik.Id == intakeId && *ik.State == intake.INTAKERESPONSESTATE_ACTIVE {
86+
return true, ik, nil
87+
}
88+
89+
if *ik.Id == intakeId && *ik.State == intake.INTAKERESPONSESTATE_FAILED {
90+
return true, ik, fmt.Errorf("create/update failed for Intake %s", intakeId)
91+
}
92+
93+
return false, nil, nil
94+
})
95+
handler.SetTimeout(10 * time.Minute)
96+
return handler
97+
}
98+
99+
func DeleteIntakeWaitHandler(ctx context.Context, a APIClientInterface, projectId, region, intakeId string) *wait.AsyncActionHandler[intake.IntakeResponse] {
100+
handler := wait.New(func() (waitFinished bool, response *intake.IntakeResponse, err error) {
101+
_, err = a.GetIntakeExecute(ctx, projectId, region, intakeId)
102+
if err == nil {
103+
return false, nil, nil
104+
}
105+
106+
var oapiError *oapierror.GenericOpenAPIError
107+
if errors.As(err, &oapiError) {
108+
if oapiError.StatusCode == http.StatusNotFound {
109+
return true, nil, nil
110+
}
111+
}
112+
return false, nil, err
113+
})
114+
handler.SetTimeout(10 * time.Minute)
115+
return handler
116+
}
117+
118+
func CreateOrUpdateIntakeUserWaitHandler(ctx context.Context, a APIClientInterface, projectId, region, intakeId, intakeUserId string) *wait.AsyncActionHandler[intake.IntakeUserResponse] {
119+
handler := wait.New(func() (waitFinished bool, response *intake.IntakeUserResponse, err error) {
120+
user, err := a.GetIntakeUserExecute(ctx, projectId, region, intakeId, intakeUserId)
121+
if err != nil {
122+
return false, nil, err
123+
}
124+
125+
if user == nil {
126+
return false, nil, fmt.Errorf("API returned a nil response for Intake User %s", intakeUserId)
127+
}
128+
129+
if user.Id == nil || user.State == nil {
130+
return false, nil, fmt.Errorf("could not get ID or State from response for Intake User %s", intakeUserId)
131+
}
132+
133+
if *user.Id == intakeUserId && *user.State == intake.INTAKEUSERRESPONSESTATE_ACTIVE {
134+
return true, user, nil
135+
}
136+
137+
// The API does not have a dedicated failure state for this resource, we rely on the timeout for cases where
138+
// it never becomes active.
139+
return false, nil, nil
140+
})
141+
handler.SetTimeout(5 * time.Minute)
142+
return handler
143+
}
144+
145+
func DeleteIntakeUserWaitHandler(ctx context.Context, a APIClientInterface, projectId, region, intakeId, intakeUserId string) *wait.AsyncActionHandler[intake.IntakeUserResponse] {
146+
handler := wait.New(func() (waitFinished bool, response *intake.IntakeUserResponse, err error) {
147+
_, err = a.GetIntakeUserExecute(ctx, projectId, region, intakeId, intakeUserId)
148+
if err == nil {
149+
return false, nil, nil
150+
}
151+
152+
var oapiError *oapierror.GenericOpenAPIError
153+
if errors.As(err, &oapiError) {
154+
if oapiError.StatusCode == http.StatusNotFound {
155+
return true, nil, nil
156+
}
157+
}
158+
return false, nil, err
159+
})
160+
handler.SetTimeout(5 * time.Minute)
161+
return handler
162+
}

0 commit comments

Comments
 (0)