diff --git a/pkg/apiserver/controller/job/create.go b/pkg/apiserver/controller/job/create.go index 6d40f279e..490649e16 100644 --- a/pkg/apiserver/controller/job/create.go +++ b/pkg/apiserver/controller/job/create.go @@ -104,8 +104,27 @@ func validateJob(ctx *logger.RequestContext, request *CreateJobInfo) error { } if len(request.ExtensionTemplate) != 0 { - // extension template from user - ctx.Logging().Infof("request ExtensionTemplate is not empty, pass validate members") + // validate extension template from user + if len(request.Members) == 0 { + ctx.Logging().Infof("request ExtensionTemplate pass validate nil members") + return nil + } + // todo validateMembers using these function + // members not nil, continue validate + if err := validateMembersRole(ctx, request); err != nil { + ctx.Logging().Errorf("validate members role failed, err: %v", err) + return err + } + // validate scheduleInfo in members + if err := validateMembersScheduleInfo(ctx, request); err != nil { + ctx.Logging().Errorf("validate members role failed, err: %v", err) + return err + } + // validate resource in members + if err := validateMembersResource(ctx, request); err != nil { + ctx.Logging().Errorf("validate members role failed, err: %v", err) + return err + } } else { // validate members if err := validateJobMembers(ctx, request); err != nil { @@ -116,6 +135,58 @@ func validateJob(ctx *logger.RequestContext, request *CreateJobInfo) error { return nil } +// validateScheduleInfo include +func validateMembersScheduleInfo(ctx *logger.RequestContext, request *CreateJobInfo) error { + var err error + // validate queue + for _, member := range request.Members { + if err = validateMembersQueue(ctx, &member, request.SchedulingPolicy); err != nil { + ctx.Logging().Errorf("Failed to check Members' Queue: %v", err) + return err + } + // check members priority + if err = checkPriority(&member.SchedulingPolicy, &request.SchedulingPolicy); err != nil { + ctx.Logging().Errorf("Failed to check priority: %v", err) + return err + } + } + return nil +} + +func validateMembersResource(ctx *logger.RequestContext, request *CreateJobInfo) error { + var err error + sumResource := resources.EmptyResource() + for index, member := range request.Members { + member.Flavour, err = flavour.GetFlavourWithCheck(member.Flavour) + if err != nil { + log.Errorf("get flavour failed, err:%v", err) + return err + } + request.Members[index].Flavour.ResourceInfo = member.Flavour.ResourceInfo + memberRes, err := resources.NewResourceFromMap(member.Flavour.ResourceInfo.ToMap()) + if err != nil { + ctx.Logging().Errorf("Failed to multiply replicas=%d and resourceInfo=%v, err: %v", member.Replicas, member.Flavour.ResourceInfo, err) + ctx.ErrorCode = common.JobInvalidField + return err + } + ctx.Logging().Debugf("member resource info %v", member.Flavour.ResourceInfo) + if memberRes.CPU() == 0 || memberRes.Memory() == 0 { + err = fmt.Errorf("flavour[%v] cpu or memory is empty", memberRes) + ctx.Logging().Errorf("Failed to check flavour: %v", err) + return err + } + memberRes.Multi(member.Replicas) + sumResource.Add(memberRes) + } + // validate queue and total-member-resource + if !sumResource.LessEqual(request.SchedulingPolicy.MaxResources) { + errMsg := fmt.Sprintf("the flavour[%+v] is larger than queue's [%+v]", sumResource, request.SchedulingPolicy.MaxResources) + ctx.Logging().Errorf(errMsg) + return fmt.Errorf(errMsg) + } + return nil +} + func validateCommonJobInfo(ctx *logger.RequestContext, requestCommonJobInfo *CommonJobInfo) error { // validate job id if requestCommonJobInfo.ID != "" { @@ -146,6 +217,28 @@ func validateCommonJobInfo(ctx *logger.RequestContext, requestCommonJobInfo *Com return nil } +func validateMembersRole(ctx *logger.RequestContext, request *CreateJobInfo) error { + log.Infof("validate job %s MembersRole", request.Name) + frameworkRoles := getFrameworkRoles(request.Framework) + for _, member := range request.Members { + memberRole := schema.MemberRole(member.Role) + _, find := frameworkRoles[memberRole] + if !find { + err := fmt.Errorf("the role[%s] for framework %s is not supported", member.Role, request.Framework) + ctx.Logging().Errorf("Failed to check Members' role, err: %v", err) + return err + } + frameworkRoles[memberRole] = frameworkRoles[memberRole] + member.Replicas + } + var err error + request.Mode, err = checkMemberRole(request.Framework, frameworkRoles) + if err != nil { + ctx.Logging().Errorf("check member role for framework %s failed, err: %v", request.Framework, err) + return err + } + return nil +} + func validateJobMembers(ctx *logger.RequestContext, request *CreateJobInfo) error { if len(request.Members) == 0 { err := fmt.Errorf("request.Members is empty") @@ -487,9 +580,10 @@ func buildJob(request *CreateJobInfo) (*model.Job, error) { var members []schema.Member var templateJson string var err error - if len(request.ExtensionTemplate) == 0 { + if len(request.Members) != 0 { members = buildMembers(request) - } else { + } + if len(request.ExtensionTemplate) != 0 { templateJson, err = newExtensionTemplateJson(request.ExtensionTemplate) if err != nil { log.Errorf("parse extension template failed, err: %v", err) diff --git a/pkg/apiserver/controller/job/create_test.go b/pkg/apiserver/controller/job/create_test.go index fdafda5c0..94f44d29c 100644 --- a/pkg/apiserver/controller/job/create_test.go +++ b/pkg/apiserver/controller/job/create_test.go @@ -91,7 +91,69 @@ func TestCreatePFJob(t *testing.T) { responseCode: 400, }, { - name: "create success request", + name: "create pod success request", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + Members: []MemberSpec{ + { + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "busybox", + }, + Role: string(schema.RoleWorker), + Replicas: 1, + }, + }, + Type: schema.TypeSingle, + Framework: schema.FrameworkStandalone, + }, + }, + wantErr: false, + responseCode: 400, + }, + { + name: "create pod failed, image absent", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + Members: []MemberSpec{ + { + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{}, + Role: string(schema.RoleWorker), + Replicas: 1, + }, + }, + Type: schema.TypeSingle, + Framework: schema.FrameworkStandalone, + }, + }, + wantErr: true, + responseCode: 400, + }, + { + name: "create paddleJob success request", args: args{ ctx: &logger.RequestContext{ UserName: mockRootUser, @@ -106,7 +168,378 @@ func TestCreatePFJob(t *testing.T) { Queue: MockQueueName, }, }, - Framework: schema.FrameworkStandalone, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RolePServer), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: false, + responseCode: 400, + }, + { + name: "paddleJob flavour validate cpu failed,err: cpu cannot be negative", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RolePServer), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{CPU: "-1", Mem: "3"}}, + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: true, + responseCode: 400, + }, + { + name: "paddleJob flavour validate cpu failed,err: cpu cannot be 0", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RolePServer), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{CPU: "0", Mem: "3"}}, + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: true, + responseCode: 400, + }, + { + name: "priority 0 err: invalid job priority", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RolePServer), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + Priority: "a", + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{CPU: "1", Mem: "3"}}, + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: true, + responseCode: 400, + }, + { + name: "schedulingPolicy.Queue should be the same", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RolePServer), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: "a", + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: true, + responseCode: 400, + }, + { + name: "the role[master] for framework paddle is not supported", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RoleMaster), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: true, + responseCode: 400, + }, + { + name: "create paddleJob success request", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, }, }, wantErr: false, @@ -294,6 +727,7 @@ func TestCreatePFJob(t *testing.T) { assert.Error(t, err) t.Logf("name=%s err: %v", tt.name, err) } else { + assert.Equal(t, nil, err) t.Logf("response: %+v", res) } }) diff --git a/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go b/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go index 63d660cfb..1413564d7 100644 --- a/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go +++ b/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go @@ -76,6 +76,10 @@ func (pj *KubePaddleJob) Submit(ctx context.Context, job *api.PFJob) error { log.Errorf("create %s failed, err %v", pj.String(jobName), err) return err } + if err := pj.validatePaddleContainers(pdj, job); err != nil { + log.Errorf("validate paddlejob %s failed, err: %v", pj.String(jobName), err) + return err + } // set metadata field kuberuntime.BuildJobMetadata(&pdj.ObjectMeta, job) @@ -84,6 +88,7 @@ func (pj *KubePaddleJob) Submit(ctx context.Context, job *api.PFJob) error { log.Errorf("build scheduling policy for %s failed, err: %v", pj.String(jobName), err) return err } + // build job spec field if job.IsCustomYaml { // set custom PaddleJob Spec from user @@ -106,6 +111,7 @@ func (pj *KubePaddleJob) Submit(ctx context.Context, job *api.PFJob) error { } func (pj *KubePaddleJob) customPaddleJob(pdj *paddlejobv1.PaddleJob, job *api.PFJob) error { + log.Infof("customPaddleJob fill resource") if pdj == nil || job == nil { return fmt.Errorf("jobSpec or PFJob is nil") } @@ -113,6 +119,10 @@ func (pj *KubePaddleJob) customPaddleJob(pdj *paddlejobv1.PaddleJob, job *api.PF log.Errorf("validate custom yaml for %s failed, err: %v", pj.String(job.ID), err) return err } + if err := pj.patchResource(pdj, job); err != nil { + log.Errorf("patch resource for paddlejob %s failed, err: %v", pj.String(job.ID), err) + return err + } return nil } @@ -382,3 +392,79 @@ func (pj *KubePaddleJob) getJobStatus(jobStatus *paddlejobv1.PaddleJobStatus) (p } return status, msg, nil } + +func (pj *KubePaddleJob) patchResource(pdj *paddlejobv1.PaddleJob, job *api.PFJob) error { + log.Infof("patch resource in paddlejob") + if len(job.Tasks) == 0 { + log.Debugf("no resources to be configured") + return nil + } + + // fill resource + var minAvailable int32 + minResources := resources.EmptyResource() + for _, task := range job.Tasks { + if pfschema.IsEmptyResource(task.Flavour.ResourceInfo) { + continue + } + resourceRequirements, err := kuberuntime.GenerateResourceRequirements(task.Flavour) + if err != nil { + log.Errorf("generate resource requirements failed, err: %v", err) + return err + } + switch task.Role { + case pfschema.RolePServer: + pdj.Spec.PS.Template.Spec.Containers[0].Resources = resourceRequirements + case pfschema.RolePWorker, pfschema.RoleWorker: + pdj.Spec.Worker.Template.Spec.Containers[0].Resources = resourceRequirements + default: + err = fmt.Errorf("role %s is not supported", task.Role) + } + if err != nil { + log.Errorf("build task for paddle job with role %s failed, err: %v", task.Role, err) + return err + } + // calculate min resources + taskResources, err := resources.NewResourceFromMap(task.Flavour.ToMap()) + if err != nil { + log.Errorf("parse resources for %s task failed, err: %v", pj.String(job.ID), err) + return err + } + taskResources.Multi(task.Replicas) + minResources.Add(taskResources) + // calculate min available + minAvailable += int32(task.Replicas) + } + // set minAvailable and minResources for paddle job + if pdj.Spec.SchedulingPolicy != nil { + pdj.Spec.SchedulingPolicy.MinAvailable = &minAvailable + pdj.Spec.SchedulingPolicy.MinResources = k8s.NewResourceList(minResources) + } + return nil +} + +func (pj *KubePaddleJob) validatePaddleContainers(pdj *paddlejobv1.PaddleJob, job *api.PFJob) error { + nilContainerErr := fmt.Errorf("container is required in paddleJob") + if pdj.Spec.PS == nil && job.JobMode == pfschema.EnvJobModePS { + err := fmt.Errorf("PS mode required spec.PS") + log.Errorln(err) + return err + } + if pdj.Spec.PS != nil { + psContainers := pdj.Spec.PS.Template.Spec.Containers + if len(psContainers) == 0 { + log.Errorln(nilContainerErr) + return nilContainerErr + } + } + if pdj.Spec.Worker == nil { + err := fmt.Errorf("worker is required in paddleJob") + log.Errorln(err) + return err + } + if len(pdj.Spec.Worker.Template.Spec.Containers) == 0 { + log.Errorln(nilContainerErr) + return nilContainerErr + } + return nil +} diff --git a/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go b/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go index 8dd5968b8..0f1e8660c 100644 --- a/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go +++ b/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go @@ -49,6 +49,13 @@ spec: containers: - name: paddle image: nginx + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi ps: replicas: 2 template: @@ -56,6 +63,114 @@ spec: containers: - name: paddle image: nginx + resources: + limits: + cpu: "2" + memory: 2Gi + requests: + cpu: "2" + memory: 2Gi +` + nilWorkerContainerYaml = ` +apiVersion: batch.paddlepaddle.org/v1 +kind: PaddleJob +metadata: + name: default-name +spec: + withGloo: 1 + intranet: PodIP + cleanPodPolicy: OnCompletion + worker: + replicas: 2 + + ps: + replicas: 2 + template: + spec: + containers: + - name: paddle + image: nginx + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi +` + nilPSContainerYaml = ` +apiVersion: batch.paddlepaddle.org/v1 +kind: PaddleJob +metadata: + name: default-name +spec: + withGloo: 1 + intranet: PodIP + cleanPodPolicy: OnCompletion + worker: + replicas: 2 + template: + spec: + containers: + - name: paddle + image: nginx + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi + ps: + replicas: 2 +` + extensionPaddleYamlNilPS = ` +apiVersion: batch.paddlepaddle.org/v1 +kind: PaddleJob +metadata: + name: default-name +spec: + withGloo: 1 + intranet: PodIP + cleanPodPolicy: OnCompletion + worker: + replicas: 2 + template: + spec: + containers: + - name: paddle + image: nginx + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi +` + extensionPaddleYamlNilWorker = ` +apiVersion: batch.paddlepaddle.org/v1 +kind: PaddleJob +metadata: + name: default-name +spec: + withGloo: 1 + intranet: PodIP + cleanPodPolicy: OnCompletion + ps: + replicas: 2 + template: + spec: + containers: + - name: paddle + image: nginx + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi ` mockPaddleJob = api.PFJob{ ID: "job-normal-0c272d0a", @@ -181,6 +296,253 @@ func TestPaddleJob_CreateJob(t *testing.T) { wantErr: errors.New("get default template failed, err: job template paddle-xx-job is not found"), wantMsg: "namespace is empty", }, + { + caseName: "extensionTemplate NilWorker", + jobObj: &api.PFJob{ + ID: "job-normal-0c272d0b", + Name: "", + Namespace: "default", + JobType: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + JobMode: schema.EnvJobModeCollective, + UserName: "root", + QueueID: "mockQueueID", + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Flavour: schema.Flavour{Name: "mockFlavourName", ResourceInfo: schema.ResourceInfo{CPU: "3", Mem: "3"}}, + }, + Tasks: []schema.Member{ + { + ID: "task-normal-0001", + Replicas: 3, + Role: schema.RoleWorker, + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Env: map[string]string{ + "PF_FS_ID": "fs-name_1", + "PF_JOB_CLUSTER_ID": "testClusterID", + "PF_JOB_FLAVOUR": "cpu", + "PF_JOB_ID": "", + "PF_JOB_NAMESPACE": "paddleflow", + "PF_JOB_PRIORITY": "NORMAL", + "PF_JOB_QUEUE_ID": "mockQueueID", + "PF_JOB_QUEUE_NAME": "mockQueueName", + schema.EnvJobType: string(schema.TypePaddleJob), + "PF_USER_NAME": "root", + }, + Flavour: schema.Flavour{Name: "cpu", ResourceInfo: schema.ResourceInfo{CPU: "2", Mem: "2"}}, + }, + }, + }, + ExtensionTemplate: []byte(extensionPaddleYamlNilWorker), + }, + wantErr: errors.New("worker is required in paddleJob"), + }, + { + caseName: "extensionTemplate NilWorker", + jobObj: &api.PFJob{ + ID: "job-normal-0c272d0b", + Name: "", + Namespace: "default", + JobType: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + JobMode: schema.EnvJobModeCollective, + UserName: "root", + QueueID: "mockQueueID", + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Flavour: schema.Flavour{Name: "mockFlavourName", ResourceInfo: schema.ResourceInfo{CPU: "3", Mem: "3"}}, + }, + Tasks: []schema.Member{ + { + ID: "task-normal-0001", + Replicas: 3, + Role: schema.RoleWorker, + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Flavour: schema.Flavour{Name: "cpu", ResourceInfo: schema.ResourceInfo{CPU: "-2", Mem: "2"}}, + }, + }, + }, + ExtensionTemplate: []byte(extensionPaddleYaml), + }, + wantErr: errors.New("negative resources not permitted: map[cpu:-2 memory:2]"), + }, + { + caseName: "extensionTemplate NilPS", + jobObj: &api.PFJob{ + ID: "job-normal-0c272d0c", + Name: "", + Namespace: "default", + JobType: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + JobMode: schema.EnvJobModePS, + UserName: "root", + QueueID: "mockQueueID", + Tasks: []schema.Member{ + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RoleWorker, + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Env: map[string]string{ + "PF_FS_ID": "fs-name_1", + "PF_JOB_CLUSTER_ID": "testClusterID", + "PF_JOB_FLAVOUR": "cpu", + "PF_JOB_ID": "", + "PF_JOB_NAMESPACE": "paddleflow", + "PF_JOB_PRIORITY": "NORMAL", + "PF_JOB_QUEUE_ID": "mockQueueID", + "PF_JOB_QUEUE_NAME": "mockQueueName", + schema.EnvJobType: string(schema.TypePaddleJob), + "PF_USER_NAME": "root", + }, + Flavour: schema.Flavour{Name: "cpu", ResourceInfo: schema.ResourceInfo{CPU: "2", Mem: "2"}}, + }, + }, + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RolePServer, + }, + }, + ExtensionTemplate: []byte(extensionPaddleYamlNilPS), + }, + wantErr: errors.New("PS mode required spec.PS"), + }, + { + caseName: "extensionTemplate NilWorkerContainer", + jobObj: &api.PFJob{ + ID: "job-normal-0c272d0c", + Name: "", + Namespace: "default", + JobType: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + JobMode: schema.EnvJobModePS, + UserName: "root", + QueueID: "mockQueueID", + Tasks: []schema.Member{ + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RoleWorker, + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Env: map[string]string{ + "PF_FS_ID": "fs-name_1", + "PF_JOB_CLUSTER_ID": "testClusterID", + "PF_JOB_FLAVOUR": "cpu", + "PF_JOB_ID": "", + "PF_JOB_NAMESPACE": "paddleflow", + "PF_JOB_PRIORITY": "NORMAL", + "PF_JOB_QUEUE_ID": "mockQueueID", + "PF_JOB_QUEUE_NAME": "mockQueueName", + schema.EnvJobType: string(schema.TypePaddleJob), + "PF_USER_NAME": "root", + }, + Flavour: schema.Flavour{Name: "cpu", ResourceInfo: schema.ResourceInfo{CPU: "2", Mem: "2"}}, + }, + }, + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RolePServer, + }, + }, + ExtensionTemplate: []byte(nilWorkerContainerYaml), + }, + wantErr: errors.New("container is required in paddleJob"), + }, + { + caseName: "extensionTemplate NilWorkerContainer", + jobObj: &api.PFJob{ + ID: "job-normal-0c272d0c", + Name: "", + Namespace: "default", + JobType: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + JobMode: schema.EnvJobModePS, + UserName: "root", + QueueID: "mockQueueID", + Tasks: []schema.Member{ + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RoleWorker, + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Env: map[string]string{ + "PF_FS_ID": "fs-name_1", + "PF_JOB_CLUSTER_ID": "testClusterID", + "PF_JOB_FLAVOUR": "cpu", + "PF_JOB_ID": "", + "PF_JOB_NAMESPACE": "paddleflow", + "PF_JOB_PRIORITY": "NORMAL", + "PF_JOB_QUEUE_ID": "mockQueueID", + "PF_JOB_QUEUE_NAME": "mockQueueName", + schema.EnvJobType: string(schema.TypePaddleJob), + "PF_USER_NAME": "root", + }, + Flavour: schema.Flavour{Name: "cpu", ResourceInfo: schema.ResourceInfo{CPU: "2", Mem: "2"}}, + }, + }, + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RolePServer, + }, + }, + ExtensionTemplate: []byte(nilPSContainerYaml), + }, + wantErr: errors.New("container is required in paddleJob"), + }, + { + caseName: "extensionTemplate member has no flavour", + jobObj: &api.PFJob{ + ID: "job-normal-0c272d0c", + Name: "", + Namespace: "default", + JobType: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + JobMode: schema.EnvJobModePS, + UserName: "root", + QueueID: "mockQueueID", + Tasks: []schema.Member{ + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RoleWorker, + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + }, + }, + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RolePServer, + }, + }, + ExtensionTemplate: []byte(extensionPaddleYaml), + }, + wantErr: nil, + }, { caseName: "create paddle job with Collective mode", jobObj: &mockPaddleJob, @@ -188,7 +550,7 @@ func TestPaddleJob_CreateJob(t *testing.T) { wantMsg: "", }, { - caseName: "job test3", + caseName: "job test ps mode", jobObj: &mockPaddlePSJob, wantErr: nil, wantMsg: "", @@ -198,17 +560,20 @@ func TestPaddleJob_CreateJob(t *testing.T) { paddleJob := New(kubeRuntimeClient) for _, test := range tests { t.Run(test.caseName, func(t *testing.T) { + t.Logf("run case[%s]", test.caseName) err := paddleJob.Submit(context.TODO(), test.jobObj) if test.wantErr == nil { assert.Equal(t, test.wantErr, err) t.Logf("case[%s] to CreateJob, paddleFlowJob=%+v", test.caseName, test.jobObj) - _, err := kubeRuntimeClient.Get(test.jobObj.Namespace, test.jobObj.ID, KubePaddleFwVersion) + obj, err := kubeRuntimeClient.Get(test.jobObj.Namespace, test.jobObj.ID, KubePaddleFwVersion) if !assert.NoError(t, err) { t.Errorf(err.Error()) } + t.Logf("result: %v", obj) } else { - assert.NotNil(t, err) - assert.Equal(t, test.wantErr.Error(), err.Error()) + if assert.Error(t, err) { + assert.Equal(t, test.wantErr.Error(), err.Error()) + } } }) }