Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 49 additions & 15 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spf13/cobra"

"github.com/aifoundry-org/oxide-controller/pkg/cluster"
"github.com/aifoundry-org/oxide-controller/pkg/config"
logpkg "github.com/aifoundry-org/oxide-controller/pkg/log"
oxidepkg "github.com/aifoundry-org/oxide-controller/pkg/oxide"
"github.com/aifoundry-org/oxide-controller/pkg/server"
Expand Down Expand Up @@ -163,11 +164,6 @@ func rootCmd() (*cobra.Command, error) {
oxideToken = profileToken
}

oxideConfig := &oxide.Config{
Host: oxideAPIURL,
Token: string(oxideToken),
}

if strings.HasPrefix(oxideToken, "file:") {
tokenFilePath := strings.TrimPrefix(oxideToken, "file:")
oxideToken = ""
Expand Down Expand Up @@ -216,18 +212,56 @@ func rootCmd() (*cobra.Command, error) {
cmd.SilenceUsage = true

ctx := context.Background()

c := cluster.New(logentry, oxideConfig, clusterProject,
controlPlanePrefix, workerPrefix, int(controlPlaneCount), int(workerCount),
cluster.NodeSpec{Image: cluster.Image{Name: controlPlaneImageName, Source: controlPlaneImageSource, Blocksize: controlPlaneImageBlocksize}, MemoryGB: int(controlPlaneMemory), CPUCount: int(controlPlaneCPU), ExternalIP: controlPlaneExternalIP, RootDiskSize: int(controlPlaneRootDiskSizeGB * cluster.GB), ExtraDiskSize: int(controlPlaneExtraDiskSizeGB * cluster.GB), TailscaleAuthKey: tailscaleAuthKey, TailscaleTag: tailscaleTag},
cluster.NodeSpec{Image: cluster.Image{Name: workerImageName, Source: workerImageSource, Blocksize: workerImageBlocksize}, MemoryGB: int(workerMemory), CPUCount: int(workerCPU), ExternalIP: workerExternalIP, RootDiskSize: int(workerRootDiskSizeGB * cluster.GB), ExtraDiskSize: int(workerExtraDiskSizeGB * cluster.GB), TailscaleAuthKey: tailscaleAuthKey, TailscaleTag: tailscaleTag},
imageParallelism,
controlPlaneNamespace, controlPlaneSecret, pubkey,
time.Duration(clusterInitWait)*time.Minute,
controllerConfig := &config.ControllerConfig{
UserSSHPublicKey: string(pubkey),
OxideToken: oxideToken,
OxideURL: oxideAPIURL,
ClusterProject: clusterProject,
ControlPlaneCount: controlPlaneCount,
ControlPlaneSpec: config.NodeSpec{
Image: config.Image{Name: controlPlaneImageName, Source: controlPlaneImageSource, Blocksize: controlPlaneImageBlocksize},
Prefix: controlPlanePrefix,
MemoryGB: int(controlPlaneMemory),
CPUCount: int(controlPlaneCPU),
ExternalIP: controlPlaneExternalIP,
RootDiskSize: int(controlPlaneRootDiskSizeGB * cluster.GB),
ExtraDiskSize: int(controlPlaneExtraDiskSizeGB * cluster.GB),
TailscaleAuthKey: tailscaleAuthKey,
TailscaleTag: tailscaleTag,
},
WorkerCount: workerCount,
WorkerSpec: config.NodeSpec{
Image: config.Image{Name: workerImageName, Source: workerImageSource, Blocksize: workerImageBlocksize},
Prefix: workerPrefix,
MemoryGB: int(workerMemory),
CPUCount: int(workerCPU),
ExternalIP: workerExternalIP,
RootDiskSize: int(workerRootDiskSizeGB * cluster.GB),
ExtraDiskSize: int(workerExtraDiskSizeGB * cluster.GB),
TailscaleAuthKey: tailscaleAuthKey,
TailscaleTag: tailscaleTag,
},

ControlPlaneNamespace: controlPlaneNamespace,
SecretName: controlPlaneSecret,
Address: address,
ControlLoopMins: controlLoopMins,
ImageParallelism: imageParallelism,
TailscaleAuthKey: tailscaleAuthKey,
TailscaleAPIKey: tailscaleAPIKey,
TailscaleTag: tailscaleTag,
TailscaleTailnet: tailscaleTailnet,
}
c := cluster.New(
logentry,
controllerConfig,
kubeconfigOverwrite,
tailscaleAPIKey,
tailscaleTailnet,
controllerOCIImage,
time.Duration(clusterInitWait)*time.Minute,

/*
pubkey,
*/
)
// we perform 2 execution loops of the cluster execute function:
// - the first one is to create the cluster and get the kubeconfig
Expand Down
113 changes: 47 additions & 66 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/aifoundry-org/oxide-controller/pkg/config"
"github.com/aifoundry-org/oxide-controller/pkg/util"
"k8s.io/client-go/kubernetes"
"tailscale.com/client/tailscale/v2"
Expand All @@ -18,51 +19,36 @@ import (
)

type Cluster struct {
logger *log.Entry
oxideConfig *oxide.Config
projectID string
controlPlanePrefix string
workerPrefix string
controlPlaneCount int
clusterInitWait time.Duration
// logger
logger *log.Entry

// reusable config that should be loaded into the secret and shared, whether running locally or in-cluster
config *config.ControllerConfig

// config that is derived locally
kubeconfigOverwrite bool
// workerCount per the CLI flags; once cluster is up and running, relies solely on amount stored in secret
workerCount int
controlPlaneSpec, workerSpec NodeSpec
secretName string
namespace string
userPubkey []byte
controlPlaneIP string
imageParallelism int
tailscaleAPIKey string
tailscaleTailnet string
clientset *kubernetes.Clientset
apiConfig *Config
ociImage string
ociImage string // OCI image to use for the controller
oxideConfig *oxide.Config
clientset *kubernetes.Clientset
apiConfig *Config
projectID string // ID of the Oxide project
initWait time.Duration // time to wait for the cluster to initialize
}

// New creates a new Cluster instance
func New(logger *log.Entry, oxideConfig *oxide.Config, projectID string, controlPlanePrefix, workerPrefix string, controlPlaneCount, workerCount int, controlPlaneSpec, workerSpec NodeSpec, imageParallelism int, namespace, secretName string, pubkey []byte, clusterInitWait time.Duration, kubeconfigOverwrite bool, tailscaleAPIKey, tailscaleTailnet, OCIimage string) *Cluster {
func New(logger *log.Entry, ctrlrConfig *config.ControllerConfig, kubeconfigOverwrite bool, ociImage string, initWait time.Duration) *Cluster {
//oxideConfig *oxide.Config, projectID string, controlPlanePrefix, workerPrefix string, controlPlaneCount, workerCount int, controlPlaneSpec, workerSpec NodeSpec, imageParallelism int, namespace, secretName string, pubkey []byte, clusterInitWait time.Duration, kubeconfigOverwrite bool, tailscaleAPIKey, tailscaleTailnet, OCIimage string)
c := &Cluster{
logger: logger.WithField("component", "cluster"),
oxideConfig: oxideConfig,
projectID: projectID,
controlPlanePrefix: controlPlanePrefix,
workerPrefix: workerPrefix,
controlPlaneSpec: controlPlaneSpec,
workerSpec: workerSpec,
secretName: secretName,
namespace: namespace,
userPubkey: pubkey,
clusterInitWait: clusterInitWait,
logger: logger.WithField("component", "cluster"),
config: ctrlrConfig,
oxideConfig: &oxide.Config{
Token: ctrlrConfig.OxideToken,
Host: ctrlrConfig.OxideURL,
},
kubeconfigOverwrite: kubeconfigOverwrite,
imageParallelism: imageParallelism,
tailscaleAPIKey: tailscaleAPIKey,
tailscaleTailnet: tailscaleTailnet,
ociImage: OCIimage,
ociImage: ociImage,
initWait: initWait,
}
c.workerCount = workerCount
c.controlPlaneCount = controlPlaneCount
return c
}

Expand All @@ -74,18 +60,18 @@ func (c *Cluster) ensureClusterExists(ctx context.Context) (newKubeconfig []byte
return nil, fmt.Errorf("failed to create Oxide API client: %v", err)
}
projectID := c.projectID
controlPlanePrefix := c.controlPlanePrefix
controlPlaneCount := c.controlPlaneCount
secretName := c.secretName
controlPlanePrefix := c.config.ControlPlaneSpec.Prefix
controlPlaneCount := c.config.ControlPlaneCount
secretName := c.config.SecretName

c.logger.Debugf("Checking if control plane IP %s exists", controlPlanePrefix)
controlPlaneIP, err := c.ensureControlPlaneIP(ctx, controlPlanePrefix)
if err != nil {
return nil, fmt.Errorf("failed to get control plane IP: %w", err)
}

if c.controlPlaneIP == "" {
c.controlPlaneIP = controlPlaneIP.Ip
if c.config.ControlPlaneIP == "" {
c.config.ControlPlaneIP = controlPlaneIP.Ip
}

c.logger.Debugf("Checking if %d control plane nodes exist with prefix %s", controlPlaneCount, controlPlanePrefix)
Expand Down Expand Up @@ -146,8 +132,8 @@ func (c *Cluster) ensureClusterExists(ctx context.Context) (newKubeconfig []byte
return nil, fmt.Errorf("failed to generate SSH key pair: %w", err)
}
var pubkeyList []string
if c.userPubkey != nil {
pubkeyList = append(pubkeyList, string(c.userPubkey))
if c.config.UserSSHPublicKey != "" {
pubkeyList = append(pubkeyList, c.config.UserSSHPublicKey)
}
pubkeyList = append(pubkeyList, string(pub))
// add the public key to the node in addition to the user one
Expand All @@ -166,7 +152,7 @@ func (c *Cluster) ensureClusterExists(ctx context.Context) (newKubeconfig []byte
externalIP string
fipAttached bool
)
if c.controlPlaneSpec.ExternalIP {
if c.config.ControlPlaneSpec.ExternalIP {
c.logger.Debugf("Control plane node %s has external IP, using that", hostid)
ipList, err := client.InstanceExternalIpList(ctx, oxide.InstanceExternalIpListParams{
Instance: oxide.NameOrId(hostid),
Expand Down Expand Up @@ -215,18 +201,18 @@ func (c *Cluster) ensureClusterExists(ctx context.Context) (newKubeconfig []byte
clusterAccessIP := externalIP

// wait for the control plane node to be up and running
timeLeft := c.clusterInitWait
timeLeft := c.initWait
for {
c.logger.Infof("Waiting %s for control plane node to be up and running...", timeLeft)
sleepTime := 30 * time.Second
time.Sleep(sleepTime)
timeLeft -= sleepTime

if c.tailscaleAPIKey != "" {
if c.config.TailscaleAPIKey != "" {
c.logger.Infof("Checking if control plane node has joined tailnet")
client := &tailscale.Client{
Tailnet: c.tailscaleTailnet,
APIKey: c.tailscaleAPIKey,
Tailnet: c.config.TailscaleTailnet,
APIKey: c.config.TailscaleAPIKey,
}
ctx := context.Background()
devices, err := client.Devices().List(ctx)
Expand Down Expand Up @@ -286,16 +272,9 @@ func (c *Cluster) ensureClusterExists(ctx context.Context) (newKubeconfig []byte
return nil, fmt.Errorf("failed to run command to retrieve join token on control plane node: %w", err)
}
// save the private key and public key to the secret
secrets[secretKeySystemSSHPublic] = pub
secrets[secretKeySystemSSHPrivate] = priv
secrets[secretKeyJoinToken] = joinToken
secrets[secretKeyOxideToken] = []byte(c.oxideConfig.Token)
secrets[secretKeyOxideURL] = []byte(c.oxideConfig.Host)

// save the user ssh public key to the secrets map
if c.userPubkey != nil {
secrets[secretKeyUserSSH] = c.userPubkey
}
c.config.K3sJoinToken = string(joinToken)
c.config.SystemSSHPublicKey = string(pub)
c.config.SystemSSHPrivateKey = string(priv)

// get the kubeconfig
kubeconfig, err := util.RunSSHCommand("root", fmt.Sprintf("%s:22", clusterAccessIP), priv, "cat /etc/rancher/k3s/k3s.yaml")
Expand All @@ -310,10 +289,6 @@ func (c *Cluster) ensureClusterExists(ctx context.Context) (newKubeconfig []byte
re := regexp.MustCompile(`(server:\s*\w+://)(\d+\.\d+\.\d+\.\d+)(:\d+)`)
kubeconfigString = re.ReplaceAllString(kubeconfigString, fmt.Sprintf("${1}%s${3}", clusterAccessIP))

// if we have worker node count explicitly defined, save it
if c.workerCount > 0 {
secrets[secretKeyWorkerCount] = []byte(fmt.Sprintf("%d", c.workerCount))
}
newKubeconfig = []byte(kubeconfigString)

// get a Kubernetes client
Expand All @@ -329,12 +304,18 @@ func (c *Cluster) ensureClusterExists(ctx context.Context) (newKubeconfig []byte
c.clientset = clientset

// ensure we have the namespace we need
namespace := c.namespace
namespace := c.config.ControlPlaneNamespace
if err := createNamespace(ctx, clientset, namespace); err != nil {
return nil, fmt.Errorf("failed to create namespace: %w", err)
}

// save the join token, system ssh key pair, user ssh key to the Kubernetes secret
configJson, err := c.config.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to convert config to JSON: %w", err)
}
secrets[secretKeyConfig] = configJson

// save the config to the Kubernetes secret
c.logger.Debugf("Saving secret %s/%s to Kubernetes", namespace, secretName)
if err := saveSecret(ctx, clientset, c.logger, namespace, secretName, secrets); err != nil {
return nil, fmt.Errorf("failed to save secret: %w", err)
Expand Down
11 changes: 3 additions & 8 deletions pkg/cluster/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,9 @@ const (

blockSize = 4096

secretKeyUserSSH = "user-ssh-public-key"
secretKeyJoinToken = "k3s-join-token"
secretKeySystemSSHPublic = "system-ssh-public-key"
secretKeySystemSSHPrivate = "system-ssh-private-key"
secretKeyWorkerCount = "worker-count"
secretKeyOxideToken = "oxide-token"
secretKeyOxideURL = "oxide-url"
maximumChunkSize = 512 * KB
secretKeyConfig = "config"

maximumChunkSize = 512 * KB

devModeOCIImage = "dev"
utilityImageName = "alpine:3.21"
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func (c *Cluster) LoadControllerToClusterNodes(ctx context.Context, infile io.Re
labelKey := "app"
labelValue := "preload-binary"
labels := map[string]string{labelKey: labelValue}
if err := deployPreloadBinaryDaemonSet(c.clientset, c.namespace, preloadBinaryName, labels); err != nil {
if err := deployPreloadBinaryDaemonSet(c.clientset, c.config.ControlPlaneNamespace, preloadBinaryName, labels); err != nil {
return fmt.Errorf("deploying preload-binary DaemonSet: %w", err)
}
if err := copyToAllDaemonSetPods(c.clientset, c.apiConfig.Config, c.namespace, fmt.Sprintf("%s=%s", labelKey, labelValue), "writer", filepath.Join(containerDir, binaryName), infile); err != nil {
if err := copyToAllDaemonSetPods(c.clientset, c.apiConfig.Config, c.config.ControlPlaneNamespace, fmt.Sprintf("%s=%s", labelKey, labelValue), "writer", filepath.Join(containerDir, binaryName), infile); err != nil {
return fmt.Errorf("copying to all DaemonSet pods: %w", err)
}
if err := removePreloadBinaryDaemonSet(c.clientset, c.namespace, preloadBinaryName); err != nil {
if err := removePreloadBinaryDaemonSet(c.clientset, c.config.ControlPlaneNamespace, preloadBinaryName); err != nil {
return fmt.Errorf("removing preload-binary DaemonSet: %w", err)
}
// update our OCI image to point to the new image
Expand Down
30 changes: 15 additions & 15 deletions pkg/cluster/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ func (c *Cluster) Execute(ctx context.Context) (newKubeconfig []byte, err error)
return nil, fmt.Errorf("failed to create Oxide API client: %v", err)
}

projectID, err := ensureProjectExists(ctx, c.logger, client, c.projectID)
projectID, err := ensureProjectExists(ctx, c.logger, client, c.config.ClusterProject)
if err != nil {
return nil, fmt.Errorf("project verification failed: %v", err)
}
if projectID != "" && projectID != c.projectID {
if projectID != "" && projectID != c.config.ClusterProject {
c.projectID = projectID
c.logger.Infof("Using project ID: %s", c.projectID)
}

images, err := ensureImagesExist(ctx, c.logger, client, c.projectID, c.imageParallelism, c.controlPlaneSpec.Image, c.workerSpec.Image)
images, err := ensureImagesExist(ctx, c.logger, client, c.projectID, c.config.ImageParallelism, c.config.ControlPlaneSpec.Image, c.config.WorkerSpec.Image)
if err != nil {
return nil, fmt.Errorf("image verification failed: %v", err)
}
Expand All @@ -39,23 +39,23 @@ func (c *Cluster) Execute(ctx context.Context) (newKubeconfig []byte, err error)
c.logger.Infof("images %v", images)

// control plane image and root disk size
c.controlPlaneSpec.Image = images[0]
minSize := util.RoundUp(c.controlPlaneSpec.Image.Size, GB)
if c.controlPlaneSpec.RootDiskSize == 0 {
c.controlPlaneSpec.RootDiskSize = minSize
c.config.ControlPlaneSpec.Image = images[0]
minSize := util.RoundUp(c.config.ControlPlaneSpec.Image.Size, GB)
if c.config.ControlPlaneSpec.RootDiskSize == 0 {
c.config.ControlPlaneSpec.RootDiskSize = minSize
}
if c.controlPlaneSpec.RootDiskSize < minSize {
return nil, fmt.Errorf("control plane root disk size %d is less than minimum image size %d", c.controlPlaneSpec.RootDiskSize, minSize)
if c.config.ControlPlaneSpec.RootDiskSize < minSize {
return nil, fmt.Errorf("control plane root disk size %d is less than minimum image size %d", c.config.ControlPlaneSpec.RootDiskSize, minSize)
}

// worker image and root disk size
c.workerSpec.Image = images[1]
minSize = util.RoundUp(c.workerSpec.Image.Size, GB)
if c.workerSpec.RootDiskSize == 0 {
c.workerSpec.RootDiskSize = minSize
c.config.WorkerSpec.Image = images[1]
minSize = util.RoundUp(c.config.WorkerSpec.Image.Size, GB)
if c.config.WorkerSpec.RootDiskSize == 0 {
c.config.WorkerSpec.RootDiskSize = minSize
}
if c.workerSpec.RootDiskSize < minSize {
return nil, fmt.Errorf("worker root disk size %d is less than minimum image size %d", c.workerSpec.RootDiskSize, minSize)
if c.config.WorkerSpec.RootDiskSize < minSize {
return nil, fmt.Errorf("worker root disk size %d is less than minimum image size %d", c.config.WorkerSpec.RootDiskSize, minSize)
}

newKubeconfig, err = c.ensureClusterExists(ctx)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func (c *Cluster) LoadHelmCharts(ctx context.Context) error {

values := map[string]interface{}{
"createNamespace": false,
"namespace": c.namespace,
"secretName": c.secretName,
"namespace": c.config.ControlPlaneNamespace,
"secretName": c.config.SecretName,
"useHostBinary": useHostBinary,
"verbose": logpkg.GetFlag(c.logger.Logger.Level), // set logging in the controller to the same as our level
"image": map[string]interface{}{
Expand All @@ -67,7 +67,7 @@ func (c *Cluster) LoadHelmCharts(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to load chart files: %w", err)
}
rel, err := installOrUpgradeEmbeddedChart(chartFiles, c.namespace, c.apiConfig.Config, values)
rel, err := installOrUpgradeEmbeddedChart(chartFiles, c.config.ControlPlaneNamespace, c.apiConfig.Config, values)
if err != nil {
return fmt.Errorf("failed to install/upgrade helm chart: %w", err)
}
Expand Down
Loading