From acc7852599f2f527853a708c712c2ffc3ab37394 Mon Sep 17 00:00:00 2001 From: zhongzichao Date: Wed, 23 Nov 2022 19:57:42 +0800 Subject: [PATCH 01/10] add paddle job resource support when using extensiontemplate --- pkg/apiserver/controller/job/create.go | 114 +++++++++++++++++- .../runtime_v2/job/paddle/kube_paddle_job.go | 78 +++++++++++- 2 files changed, 187 insertions(+), 5 deletions(-) diff --git a/pkg/apiserver/controller/job/create.go b/pkg/apiserver/controller/job/create.go index 6d40f279e..c801b1c6a 100644 --- a/pkg/apiserver/controller/job/create.go +++ b/pkg/apiserver/controller/job/create.go @@ -104,8 +104,27 @@ func validateJob(ctx *logger.RequestContext, request *CreateJobInfo) error { } if len(request.ExtensionTemplate) != 0 { - // extension template from user - ctx.Logging().Infof("request ExtensionTemplate is not empty, pass validate members") + // validate extension template from user + if len(request.Members) == 0 { + ctx.Logging().Infof("request ExtensionTemplate pass validate nil members") + return nil + } + // todo validateMembers using these function + // members not nil, continue validate + if err := validateMembersRole(ctx, request); err != nil { + ctx.Logging().Errorf("validate members role failed, err: %v", err) + return err + } + // validate scheduleInfo in members + if err := validateMembersScheduleInfo(ctx, request); err != nil { + ctx.Logging().Errorf("validate members role failed, err: %v", err) + return err + } + // validate resource in members + if err := validateMembersResource(ctx, request); err != nil { + ctx.Logging().Errorf("validate members role failed, err: %v", err) + return err + } } else { // validate members if err := validateJobMembers(ctx, request); err != nil { @@ -116,6 +135,59 @@ func validateJob(ctx *logger.RequestContext, request *CreateJobInfo) error { return nil } +// validateScheduleInfo include +func validateMembersScheduleInfo(ctx *logger.RequestContext, request *CreateJobInfo) error { + var err error + // validate queue + for _, member := range request.Members { + if err = validateMembersQueue(ctx, &member, request.SchedulingPolicy); err != nil { + ctx.Logging().Errorf("Failed to check Members' Queue: %v", err) + return err + } + // check members priority + if err = checkPriority(&member.SchedulingPolicy, &request.SchedulingPolicy); err != nil { + ctx.Logging().Errorf("Failed to check priority: %v", err) + return err + } + } + return nil +} + +func validateMembersResource(ctx *logger.RequestContext, request *CreateJobInfo) error { + var err error + sumResource := resources.EmptyResource() + for index, member := range request.Members { + member.Flavour, err = flavour.GetFlavourWithCheck(member.Flavour) + if err != nil { + log.Errorf("get flavour failed, err:%v", err) + return err + } + request.Members[index].Flavour.ResourceInfo = member.Flavour.ResourceInfo + // sum = sum + member.Replicas * member.Flavour.ResourceInfo + memberRes, err := resources.NewResourceFromMap(member.Flavour.ResourceInfo.ToMap()) + if err != nil { + ctx.Logging().Errorf("Failed to multiply replicas=%d and resourceInfo=%v, err: %v", member.Replicas, member.Flavour.ResourceInfo, err) + ctx.ErrorCode = common.JobInvalidField + return err + } + ctx.Logging().Debugf("member resource info %v", member.Flavour.ResourceInfo) + if memberRes.CPU() == 0 || memberRes.Memory() == 0 { + err = fmt.Errorf("flavour[%v] cpu or memory is empty", memberRes) + ctx.Logging().Errorf("Failed to check flavour: %v", err) + return err + } + memberRes.Multi(member.Replicas) + sumResource.Add(memberRes) + } + // validate queue and total-member-resource + if !sumResource.LessEqual(request.SchedulingPolicy.MaxResources) { + errMsg := fmt.Sprintf("the flavour[%+v] is larger than queue's [%+v]", sumResource, request.SchedulingPolicy.MaxResources) + ctx.Logging().Errorf(errMsg) + return fmt.Errorf(errMsg) + } + return nil +} + func validateCommonJobInfo(ctx *logger.RequestContext, requestCommonJobInfo *CommonJobInfo) error { // validate job id if requestCommonJobInfo.ID != "" { @@ -146,6 +218,29 @@ func validateCommonJobInfo(ctx *logger.RequestContext, requestCommonJobInfo *Com return nil } +func validateMembersRole(ctx *logger.RequestContext, request *CreateJobInfo) error { + if len(request.Members) == 0 { + log.Debugf("Members in request is nil") + } + frameworkRoles := getFrameworkRoles(request.Framework) + for _, member := range request.Members { + memberRole := schema.MemberRole(member.Role) + _, find := frameworkRoles[memberRole] + if !find { + err := fmt.Errorf("the role[%s] for framework %s is not supported", member.Role, request.Framework) + ctx.Logging().Errorf("Failed to check Members' role, err: %v", err) + return err + } + } + var err error + request.Mode, err = checkMemberRole(request.Framework, frameworkRoles) + if err != nil { + ctx.Logging().Errorf("check member role for framework %s failed, err: %v", request.Framework, err) + return err + } + return nil +} + func validateJobMembers(ctx *logger.RequestContext, request *CreateJobInfo) error { if len(request.Members) == 0 { err := fmt.Errorf("request.Members is empty") @@ -487,9 +582,10 @@ func buildJob(request *CreateJobInfo) (*model.Job, error) { var members []schema.Member var templateJson string var err error - if len(request.ExtensionTemplate) == 0 { + if len(request.Members) != 0 { members = buildMembers(request) - } else { + } + if len(request.ExtensionTemplate) != 0 { templateJson, err = newExtensionTemplateJson(request.ExtensionTemplate) if err != nil { log.Errorf("parse extension template failed, err: %v", err) @@ -497,6 +593,16 @@ func buildJob(request *CreateJobInfo) (*model.Job, error) { } } + //if len(request.ExtensionTemplate) == 0 { + // members = buildMembers(request) + //} else { + // templateJson, err = newExtensionTemplateJson(request.ExtensionTemplate) + // if err != nil { + // log.Errorf("parse extension template failed, err: %v", err) + // return nil, err + // } + //} + jobInfo := &model.Job{ ID: request.ID, Name: request.Name, diff --git a/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go b/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go index 63d660cfb..f0e315752 100644 --- a/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go +++ b/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go @@ -76,6 +76,10 @@ func (pj *KubePaddleJob) Submit(ctx context.Context, job *api.PFJob) error { log.Errorf("create %s failed, err %v", pj.String(jobName), err) return err } + if err := pj.validatePodContainers(pdj); err != nil { + log.Errorf("validate paddlejob %s failed, err: %v", pj.String(jobName), err) + return err + } // set metadata field kuberuntime.BuildJobMetadata(&pdj.ObjectMeta, job) @@ -84,6 +88,7 @@ func (pj *KubePaddleJob) Submit(ctx context.Context, job *api.PFJob) error { log.Errorf("build scheduling policy for %s failed, err: %v", pj.String(jobName), err) return err } + // build job spec field if job.IsCustomYaml { // set custom PaddleJob Spec from user @@ -113,6 +118,10 @@ func (pj *KubePaddleJob) customPaddleJob(pdj *paddlejobv1.PaddleJob, job *api.PF log.Errorf("validate custom yaml for %s failed, err: %v", pj.String(job.ID), err) return err } + if err := pj.patchResource(pdj, job); err != nil { + log.Errorf("patch resource for paddlejob %s failed, err: %v", pj.String(job.ID), err) + return err + } return nil } @@ -233,7 +242,7 @@ func (pj *KubePaddleJob) patchPaddleTask(resourceSpec *paddlejobv1.ResourceSpec, return kuberuntime.BuildPodSpec(&resourceSpec.Template.Spec, task) } -func (pj *KubePaddleJob) Stop(ctx context.Context, job *api.PFJob) error { +func (pj *KubePaddleJob) Stop(cctx context.Context, job *api.PFJob) error { if job == nil { return fmt.Errorf("job is nil") } @@ -382,3 +391,70 @@ func (pj *KubePaddleJob) getJobStatus(jobStatus *paddlejobv1.PaddleJobStatus) (p } return status, msg, nil } + +func (pj *KubePaddleJob) patchResource(pdj *paddlejobv1.PaddleJob, job *api.PFJob) error { + log.Infof("patch resource in paddlejob") + if len(job.Tasks) == 0 { + log.Debugf("no resources to be configured") + } + + // fill resource + var minAvailable int32 + minResources := resources.EmptyResource() + for _, task := range job.Tasks { + resourceRequirements, err := kuberuntime.GenerateResourceRequirements(task.Flavour) + if err != nil { + log.Errorf("generate resource requirements failed, err: %v", err) + return err + } + switch task.Role { + case pfschema.RolePServer: + pdj.Spec.PS.Template.Spec.Containers[0].Resources = resourceRequirements + case pfschema.RolePWorker, pfschema.RoleWorker: + pdj.Spec.Worker.Template.Spec.Containers[0].Resources = resourceRequirements + default: + err = fmt.Errorf("role %s is not supported", task.Role) + } + if err != nil { + log.Errorf("build task for paddle job with role %s failed, err: %v", task.Role, err) + return err + } + // calculate min resources + taskResources, err := resources.NewResourceFromMap(task.Flavour.ToMap()) + if err != nil { + log.Errorf("parse resources for %s task failed, err: %v", pj.String(job.ID), err) + return err + } + taskResources.Multi(task.Replicas) + minResources.Add(taskResources) + // calculate min available + minAvailable += int32(task.Replicas) + } + // set minAvailable and minResources for paddle job + if pdj.Spec.SchedulingPolicy != nil { + pdj.Spec.SchedulingPolicy.MinAvailable = &minAvailable + pdj.Spec.SchedulingPolicy.MinResources = k8s.NewResourceList(minResources) + } + return nil +} + +func (pj *KubePaddleJob) validatePodContainers(pdj *paddlejobv1.PaddleJob) error { + nilContainerErr := fmt.Errorf("worker is required in paddleJob") + if pdj.Spec.PS != nil { + psContainers := pdj.Spec.PS.Template.Spec.Containers + if len(psContainers) == 0 { + log.Errorln(nilContainerErr) + return nilContainerErr + } + } + if pdj.Spec.Worker == nil { + err := fmt.Errorf("worker is required in paddleJob") + log.Errorln(err) + return err + } + if len(pdj.Spec.Worker.Template.Spec.Containers) == 0 { + log.Errorln(nilContainerErr) + return nilContainerErr + } + return nil +} From 8e1563b7a4a91f83cebef3d2c20c9e510f1e7794 Mon Sep 17 00:00:00 2001 From: zhongzichao Date: Wed, 23 Nov 2022 21:19:57 +0800 Subject: [PATCH 02/10] add paddle job resource support when using extensiontemplate --- pkg/apiserver/controller/job/create.go | 12 +- pkg/apiserver/controller/job/create_test.go | 165 +++++++++++++++++- .../runtime_v2/job/paddle/kube_paddle_job.go | 2 +- 3 files changed, 166 insertions(+), 13 deletions(-) diff --git a/pkg/apiserver/controller/job/create.go b/pkg/apiserver/controller/job/create.go index c801b1c6a..5041b4c46 100644 --- a/pkg/apiserver/controller/job/create.go +++ b/pkg/apiserver/controller/job/create.go @@ -219,6 +219,7 @@ func validateCommonJobInfo(ctx *logger.RequestContext, requestCommonJobInfo *Com } func validateMembersRole(ctx *logger.RequestContext, request *CreateJobInfo) error { + log.Infof("validate job %s MembersRole", request.Name) if len(request.Members) == 0 { log.Debugf("Members in request is nil") } @@ -231,6 +232,7 @@ func validateMembersRole(ctx *logger.RequestContext, request *CreateJobInfo) err ctx.Logging().Errorf("Failed to check Members' role, err: %v", err) return err } + frameworkRoles[memberRole] = frameworkRoles[memberRole] + member.Replicas } var err error request.Mode, err = checkMemberRole(request.Framework, frameworkRoles) @@ -593,16 +595,6 @@ func buildJob(request *CreateJobInfo) (*model.Job, error) { } } - //if len(request.ExtensionTemplate) == 0 { - // members = buildMembers(request) - //} else { - // templateJson, err = newExtensionTemplateJson(request.ExtensionTemplate) - // if err != nil { - // log.Errorf("parse extension template failed, err: %v", err) - // return nil, err - // } - //} - jobInfo := &model.Job{ ID: request.ID, Name: request.Name, diff --git a/pkg/apiserver/controller/job/create_test.go b/pkg/apiserver/controller/job/create_test.go index fdafda5c0..18f201696 100644 --- a/pkg/apiserver/controller/job/create_test.go +++ b/pkg/apiserver/controller/job/create_test.go @@ -91,7 +91,69 @@ func TestCreatePFJob(t *testing.T) { responseCode: 400, }, { - name: "create success request", + name: "create pod success request", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + Members: []MemberSpec{ + { + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "busybox", + }, + Role: string(schema.RoleWorker), + Replicas: 1, + }, + }, + Type: schema.TypeSingle, + Framework: schema.FrameworkStandalone, + }, + }, + wantErr: false, + responseCode: 400, + }, + { + name: "create pod failed, image absent", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + Members: []MemberSpec{ + { + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{}, + Role: string(schema.RoleWorker), + Replicas: 1, + }, + }, + Type: schema.TypeSingle, + Framework: schema.FrameworkStandalone, + }, + }, + wantErr: true, + responseCode: 400, + }, + { + name: "create paddleJob success request", args: args{ ctx: &logger.RequestContext{ UserName: mockRootUser, @@ -106,12 +168,110 @@ func TestCreatePFJob(t *testing.T) { Queue: MockQueueName, }, }, - Framework: schema.FrameworkStandalone, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RolePServer), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, }, }, wantErr: false, responseCode: 400, }, + { + name: "framework paddle in collective mode, only setting role work", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RolePServer), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: true, + responseCode: 400, + }, { name: "create mpijob success request", args: args{ @@ -294,6 +454,7 @@ func TestCreatePFJob(t *testing.T) { assert.Error(t, err) t.Logf("name=%s err: %v", tt.name, err) } else { + assert.Equal(t, nil, err) t.Logf("response: %+v", res) } }) diff --git a/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go b/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go index f0e315752..5aff691a3 100644 --- a/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go +++ b/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go @@ -242,7 +242,7 @@ func (pj *KubePaddleJob) patchPaddleTask(resourceSpec *paddlejobv1.ResourceSpec, return kuberuntime.BuildPodSpec(&resourceSpec.Template.Spec, task) } -func (pj *KubePaddleJob) Stop(cctx context.Context, job *api.PFJob) error { +func (pj *KubePaddleJob) Stop(ctx context.Context, job *api.PFJob) error { if job == nil { return fmt.Errorf("job is nil") } From 3126069e7a702b814b68b904cfeba3b2ec7288a4 Mon Sep 17 00:00:00 2001 From: zhongzichao Date: Wed, 23 Nov 2022 21:33:18 +0800 Subject: [PATCH 03/10] add paddle job resource support when using extensiontemplate --- .../job/paddle/kube_paddle_job_test.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go b/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go index 8dd5968b8..c434925ea 100644 --- a/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go +++ b/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go @@ -49,6 +49,13 @@ spec: containers: - name: paddle image: nginx + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi ps: replicas: 2 template: @@ -56,6 +63,13 @@ spec: containers: - name: paddle image: nginx + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi ` mockPaddleJob = api.PFJob{ ID: "job-normal-0c272d0a", @@ -188,7 +202,7 @@ func TestPaddleJob_CreateJob(t *testing.T) { wantMsg: "", }, { - caseName: "job test3", + caseName: "job test ps mode", jobObj: &mockPaddlePSJob, wantErr: nil, wantMsg: "", @@ -198,6 +212,7 @@ func TestPaddleJob_CreateJob(t *testing.T) { paddleJob := New(kubeRuntimeClient) for _, test := range tests { t.Run(test.caseName, func(t *testing.T) { + t.Logf("run case[%s]", test.caseName) err := paddleJob.Submit(context.TODO(), test.jobObj) if test.wantErr == nil { assert.Equal(t, test.wantErr, err) From cc3c81f2ae75c519a72e6b39d8e8bca3a0e1b867 Mon Sep 17 00:00:00 2001 From: zhongzichao Date: Wed, 23 Nov 2022 21:55:15 +0800 Subject: [PATCH 04/10] add paddle job resource support when using extensiontemplate --- pkg/apiserver/controller/job/create_test.go | 60 --------------------- 1 file changed, 60 deletions(-) diff --git a/pkg/apiserver/controller/job/create_test.go b/pkg/apiserver/controller/job/create_test.go index 18f201696..261907a58 100644 --- a/pkg/apiserver/controller/job/create_test.go +++ b/pkg/apiserver/controller/job/create_test.go @@ -212,66 +212,6 @@ func TestCreatePFJob(t *testing.T) { wantErr: false, responseCode: 400, }, - { - name: "framework paddle in collective mode, only setting role work", - args: args{ - ctx: &logger.RequestContext{ - UserName: mockRootUser, - }, - req: &CreateJobInfo{ - CommonJobInfo: CommonJobInfo{ - ID: uuid.GenerateIDWithLength("job", 5), - Name: "normal", - Labels: map[string]string{}, - Annotations: map[string]string{}, - SchedulingPolicy: SchedulingPolicy{ - Queue: MockQueueName, - }, - }, - Type: schema.TypeDistributed, - Framework: schema.FrameworkPaddle, - Members: []MemberSpec{ - { - Replicas: 1, - Role: string(schema.RolePServer), - CommonJobInfo: CommonJobInfo{ - Name: "normal", - Labels: map[string]string{}, - Annotations: map[string]string{}, - SchedulingPolicy: SchedulingPolicy{ - Queue: MockQueueName, - }, - }, - JobSpec: JobSpec{ - Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", - Command: "sleep 20", - }, - }, - { - Replicas: 1, - Role: string(schema.RolePWorker), - CommonJobInfo: CommonJobInfo{ - Name: "normal", - Labels: map[string]string{}, - Annotations: map[string]string{}, - SchedulingPolicy: SchedulingPolicy{ - Queue: MockQueueName, - }, - }, - JobSpec: JobSpec{ - Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", - Command: "sleep 20", - }, - }, - }, - ExtensionTemplate: map[string]interface{}{ - "a": "b", - }, - }, - }, - wantErr: true, - responseCode: 400, - }, { name: "create mpijob success request", args: args{ From 007ccb4c86b9a98157f2583d6a07bb4d798b9d29 Mon Sep 17 00:00:00 2001 From: zhongzichao Date: Wed, 23 Nov 2022 22:35:43 +0800 Subject: [PATCH 05/10] add paddle job resource support when using extensiontemplate --- pkg/apiserver/controller/job/create.go | 3 - pkg/apiserver/controller/job/create_test.go | 146 ++++++++++++++++++++ 2 files changed, 146 insertions(+), 3 deletions(-) diff --git a/pkg/apiserver/controller/job/create.go b/pkg/apiserver/controller/job/create.go index 5041b4c46..a7aab2f17 100644 --- a/pkg/apiserver/controller/job/create.go +++ b/pkg/apiserver/controller/job/create.go @@ -220,9 +220,6 @@ func validateCommonJobInfo(ctx *logger.RequestContext, requestCommonJobInfo *Com func validateMembersRole(ctx *logger.RequestContext, request *CreateJobInfo) error { log.Infof("validate job %s MembersRole", request.Name) - if len(request.Members) == 0 { - log.Debugf("Members in request is nil") - } frameworkRoles := getFrameworkRoles(request.Framework) for _, member := range request.Members { memberRole := schema.MemberRole(member.Role) diff --git a/pkg/apiserver/controller/job/create_test.go b/pkg/apiserver/controller/job/create_test.go index 261907a58..611f418c4 100644 --- a/pkg/apiserver/controller/job/create_test.go +++ b/pkg/apiserver/controller/job/create_test.go @@ -212,6 +212,152 @@ func TestCreatePFJob(t *testing.T) { wantErr: false, responseCode: 400, }, + { + name: "schedulingPolicy.Queue should be the same", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RolePServer), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: "a", + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: true, + responseCode: 400, + }, + { + name: "the role[master] for framework paddle is not supported", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RoleMaster), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: true, + responseCode: 400, + }, + { + name: "create paddleJob success request", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: false, + responseCode: 400, + }, { name: "create mpijob success request", args: args{ From a2165e53763708302051bc90f276feda49a9deb2 Mon Sep 17 00:00:00 2001 From: zhongzichao Date: Thu, 24 Nov 2022 09:59:21 +0800 Subject: [PATCH 06/10] add paddle job resource support when using extensiontemplate --- pkg/apiserver/controller/job/create_test.go | 124 ++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/pkg/apiserver/controller/job/create_test.go b/pkg/apiserver/controller/job/create_test.go index 611f418c4..96c4ae1c0 100644 --- a/pkg/apiserver/controller/job/create_test.go +++ b/pkg/apiserver/controller/job/create_test.go @@ -212,6 +212,130 @@ func TestCreatePFJob(t *testing.T) { wantErr: false, responseCode: 400, }, + { + name: "paddleJob flavour validate cpu failed,err: cpu cannot be negative", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RolePServer), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{CPU: "-1", Mem: "3"}}, + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: true, + responseCode: 400, + }, + { + name: "paddleJob flavour validate cpu failed,err: cpu cannot be 0", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RolePServer), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{CPU: "0", Mem: "3"}}, + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: true, + responseCode: 400, + }, { name: "schedulingPolicy.Queue should be the same", args: args{ From a3d0c85b6950ac26bc9f765e4eb5927db8f29d25 Mon Sep 17 00:00:00 2001 From: zhongzichao Date: Thu, 24 Nov 2022 10:57:04 +0800 Subject: [PATCH 07/10] add paddle job resource support when using extensiontemplate --- .../runtime_v2/job/paddle/kube_paddle_job.go | 12 +- .../job/paddle/kube_paddle_job_test.go | 287 +++++++++++++++++- 2 files changed, 294 insertions(+), 5 deletions(-) diff --git a/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go b/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go index 5aff691a3..cd3ec61b0 100644 --- a/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go +++ b/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go @@ -76,7 +76,7 @@ func (pj *KubePaddleJob) Submit(ctx context.Context, job *api.PFJob) error { log.Errorf("create %s failed, err %v", pj.String(jobName), err) return err } - if err := pj.validatePodContainers(pdj); err != nil { + if err := pj.validatePaddleContainers(pdj, job); err != nil { log.Errorf("validate paddlejob %s failed, err: %v", pj.String(jobName), err) return err } @@ -111,6 +111,7 @@ func (pj *KubePaddleJob) Submit(ctx context.Context, job *api.PFJob) error { } func (pj *KubePaddleJob) customPaddleJob(pdj *paddlejobv1.PaddleJob, job *api.PFJob) error { + log.Infof("customPaddleJob fill resource") if pdj == nil || job == nil { return fmt.Errorf("jobSpec or PFJob is nil") } @@ -438,8 +439,13 @@ func (pj *KubePaddleJob) patchResource(pdj *paddlejobv1.PaddleJob, job *api.PFJo return nil } -func (pj *KubePaddleJob) validatePodContainers(pdj *paddlejobv1.PaddleJob) error { - nilContainerErr := fmt.Errorf("worker is required in paddleJob") +func (pj *KubePaddleJob) validatePaddleContainers(pdj *paddlejobv1.PaddleJob, job *api.PFJob) error { + nilContainerErr := fmt.Errorf("container is required in paddleJob") + if pdj.Spec.PS == nil && job.JobMode == pfschema.EnvJobModePS { + err := fmt.Errorf("PS mode required spec.PS") + log.Errorln(err) + return err + } if pdj.Spec.PS != nil { psContainers := pdj.Spec.PS.Template.Spec.Containers if len(psContainers) == 0 { diff --git a/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go b/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go index c434925ea..9e2177567 100644 --- a/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go +++ b/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go @@ -70,6 +70,107 @@ spec: requests: cpu: "1" memory: 1Gi +` + nilWorkerContainerYaml = ` +apiVersion: batch.paddlepaddle.org/v1 +kind: PaddleJob +metadata: + name: default-name +spec: + withGloo: 1 + intranet: PodIP + cleanPodPolicy: OnCompletion + worker: + replicas: 2 + + ps: + replicas: 2 + template: + spec: + containers: + - name: paddle + image: nginx + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi +` + nilPSContainerYaml = ` +apiVersion: batch.paddlepaddle.org/v1 +kind: PaddleJob +metadata: + name: default-name +spec: + withGloo: 1 + intranet: PodIP + cleanPodPolicy: OnCompletion + worker: + replicas: 2 + template: + spec: + containers: + - name: paddle + image: nginx + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi + ps: + replicas: 2 +` + extensionPaddleYamlNilPS = ` +apiVersion: batch.paddlepaddle.org/v1 +kind: PaddleJob +metadata: + name: default-name +spec: + withGloo: 1 + intranet: PodIP + cleanPodPolicy: OnCompletion + worker: + replicas: 2 + template: + spec: + containers: + - name: paddle + image: nginx + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi +` + extensionPaddleYamlNilWorker = ` +apiVersion: batch.paddlepaddle.org/v1 +kind: PaddleJob +metadata: + name: default-name +spec: + withGloo: 1 + intranet: PodIP + cleanPodPolicy: OnCompletion + ps: + replicas: 2 + template: + spec: + containers: + - name: paddle + image: nginx + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: "1" + memory: 1Gi ` mockPaddleJob = api.PFJob{ ID: "job-normal-0c272d0a", @@ -195,6 +296,187 @@ func TestPaddleJob_CreateJob(t *testing.T) { wantErr: errors.New("get default template failed, err: job template paddle-xx-job is not found"), wantMsg: "namespace is empty", }, + { + caseName: "extensionTemplate NilWorker", + jobObj: &api.PFJob{ + ID: "job-normal-0c272d0b", + Name: "", + Namespace: "default", + JobType: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + JobMode: schema.EnvJobModeCollective, + UserName: "root", + QueueID: "mockQueueID", + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Flavour: schema.Flavour{Name: "mockFlavourName", ResourceInfo: schema.ResourceInfo{CPU: "3", Mem: "3"}}, + }, + Tasks: []schema.Member{ + { + ID: "task-normal-0001", + Replicas: 3, + Role: schema.RoleWorker, + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Env: map[string]string{ + "PF_FS_ID": "fs-name_1", + "PF_JOB_CLUSTER_ID": "testClusterID", + "PF_JOB_FLAVOUR": "cpu", + "PF_JOB_ID": "", + "PF_JOB_NAMESPACE": "paddleflow", + "PF_JOB_PRIORITY": "NORMAL", + "PF_JOB_QUEUE_ID": "mockQueueID", + "PF_JOB_QUEUE_NAME": "mockQueueName", + schema.EnvJobType: string(schema.TypePaddleJob), + "PF_USER_NAME": "root", + }, + Flavour: schema.Flavour{Name: "cpu", ResourceInfo: schema.ResourceInfo{CPU: "2", Mem: "2"}}, + }, + }, + }, + ExtensionTemplate: []byte(extensionPaddleYamlNilWorker), + }, + wantErr: errors.New("worker is required in paddleJob"), + }, + { + caseName: "extensionTemplate NilPS", + jobObj: &api.PFJob{ + ID: "job-normal-0c272d0c", + Name: "", + Namespace: "default", + JobType: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + JobMode: schema.EnvJobModePS, + UserName: "root", + QueueID: "mockQueueID", + Tasks: []schema.Member{ + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RoleWorker, + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Env: map[string]string{ + "PF_FS_ID": "fs-name_1", + "PF_JOB_CLUSTER_ID": "testClusterID", + "PF_JOB_FLAVOUR": "cpu", + "PF_JOB_ID": "", + "PF_JOB_NAMESPACE": "paddleflow", + "PF_JOB_PRIORITY": "NORMAL", + "PF_JOB_QUEUE_ID": "mockQueueID", + "PF_JOB_QUEUE_NAME": "mockQueueName", + schema.EnvJobType: string(schema.TypePaddleJob), + "PF_USER_NAME": "root", + }, + Flavour: schema.Flavour{Name: "cpu", ResourceInfo: schema.ResourceInfo{CPU: "2", Mem: "2"}}, + }, + }, + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RolePServer, + }, + }, + ExtensionTemplate: []byte(extensionPaddleYamlNilPS), + }, + wantErr: errors.New("PS mode required spec.PS"), + }, + { + caseName: "extensionTemplate NilWorkerContainer", + jobObj: &api.PFJob{ + ID: "job-normal-0c272d0c", + Name: "", + Namespace: "default", + JobType: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + JobMode: schema.EnvJobModePS, + UserName: "root", + QueueID: "mockQueueID", + Tasks: []schema.Member{ + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RoleWorker, + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Env: map[string]string{ + "PF_FS_ID": "fs-name_1", + "PF_JOB_CLUSTER_ID": "testClusterID", + "PF_JOB_FLAVOUR": "cpu", + "PF_JOB_ID": "", + "PF_JOB_NAMESPACE": "paddleflow", + "PF_JOB_PRIORITY": "NORMAL", + "PF_JOB_QUEUE_ID": "mockQueueID", + "PF_JOB_QUEUE_NAME": "mockQueueName", + schema.EnvJobType: string(schema.TypePaddleJob), + "PF_USER_NAME": "root", + }, + Flavour: schema.Flavour{Name: "cpu", ResourceInfo: schema.ResourceInfo{CPU: "2", Mem: "2"}}, + }, + }, + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RolePServer, + }, + }, + ExtensionTemplate: []byte(nilWorkerContainerYaml), + }, + wantErr: errors.New("container is required in paddleJob"), + }, + { + caseName: "extensionTemplate NilWorkerContainer", + jobObj: &api.PFJob{ + ID: "job-normal-0c272d0c", + Name: "", + Namespace: "default", + JobType: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + JobMode: schema.EnvJobModePS, + UserName: "root", + QueueID: "mockQueueID", + Tasks: []schema.Member{ + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RoleWorker, + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Env: map[string]string{ + "PF_FS_ID": "fs-name_1", + "PF_JOB_CLUSTER_ID": "testClusterID", + "PF_JOB_FLAVOUR": "cpu", + "PF_JOB_ID": "", + "PF_JOB_NAMESPACE": "paddleflow", + "PF_JOB_PRIORITY": "NORMAL", + "PF_JOB_QUEUE_ID": "mockQueueID", + "PF_JOB_QUEUE_NAME": "mockQueueName", + schema.EnvJobType: string(schema.TypePaddleJob), + "PF_USER_NAME": "root", + }, + Flavour: schema.Flavour{Name: "cpu", ResourceInfo: schema.ResourceInfo{CPU: "2", Mem: "2"}}, + }, + }, + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RolePServer, + }, + }, + ExtensionTemplate: []byte(nilPSContainerYaml), + }, + wantErr: errors.New("container is required in paddleJob"), + }, { caseName: "create paddle job with Collective mode", jobObj: &mockPaddleJob, @@ -222,8 +504,9 @@ func TestPaddleJob_CreateJob(t *testing.T) { t.Errorf(err.Error()) } } else { - assert.NotNil(t, err) - assert.Equal(t, test.wantErr.Error(), err.Error()) + if assert.Error(t, err) { + assert.Equal(t, test.wantErr.Error(), err.Error()) + } } }) } From 775a02be3e381ae6258d13cefb91fbd0fc06ef32 Mon Sep 17 00:00:00 2001 From: zhongzichao Date: Thu, 24 Nov 2022 11:16:03 +0800 Subject: [PATCH 08/10] add paddle job resource support when using extensiontemplate --- pkg/apiserver/controller/job/create_test.go | 63 +++++++++++++++++++ .../job/paddle/kube_paddle_job_test.go | 34 ++++++++++ 2 files changed, 97 insertions(+) diff --git a/pkg/apiserver/controller/job/create_test.go b/pkg/apiserver/controller/job/create_test.go index 96c4ae1c0..94f44d29c 100644 --- a/pkg/apiserver/controller/job/create_test.go +++ b/pkg/apiserver/controller/job/create_test.go @@ -336,6 +336,69 @@ func TestCreatePFJob(t *testing.T) { wantErr: true, responseCode: 400, }, + { + name: "priority 0 err: invalid job priority", + args: args{ + ctx: &logger.RequestContext{ + UserName: mockRootUser, + }, + req: &CreateJobInfo{ + CommonJobInfo: CommonJobInfo{ + ID: uuid.GenerateIDWithLength("job", 5), + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + Type: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + Members: []MemberSpec{ + { + Replicas: 1, + Role: string(schema.RolePServer), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + Priority: "a", + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + Flavour: schema.Flavour{ + ResourceInfo: schema.ResourceInfo{CPU: "1", Mem: "3"}}, + }, + }, + { + Replicas: 1, + Role: string(schema.RolePWorker), + CommonJobInfo: CommonJobInfo{ + Name: "normal", + Labels: map[string]string{}, + Annotations: map[string]string{}, + SchedulingPolicy: SchedulingPolicy{ + Queue: MockQueueName, + }, + }, + JobSpec: JobSpec{ + Image: "iregistry.baidu-int.com/bmlc/trainingjob:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + Command: "sleep 20", + }, + }, + }, + ExtensionTemplate: map[string]interface{}{ + "a": "b", + }, + }, + }, + wantErr: true, + responseCode: 400, + }, { name: "schedulingPolicy.Queue should be the same", args: args{ diff --git a/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go b/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go index 9e2177567..c6750c993 100644 --- a/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go +++ b/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go @@ -342,6 +342,40 @@ func TestPaddleJob_CreateJob(t *testing.T) { }, wantErr: errors.New("worker is required in paddleJob"), }, + { + caseName: "extensionTemplate NilWorker", + jobObj: &api.PFJob{ + ID: "job-normal-0c272d0b", + Name: "", + Namespace: "default", + JobType: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + JobMode: schema.EnvJobModeCollective, + UserName: "root", + QueueID: "mockQueueID", + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Flavour: schema.Flavour{Name: "mockFlavourName", ResourceInfo: schema.ResourceInfo{CPU: "3", Mem: "3"}}, + }, + Tasks: []schema.Member{ + { + ID: "task-normal-0001", + Replicas: 3, + Role: schema.RoleWorker, + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + Flavour: schema.Flavour{Name: "cpu", ResourceInfo: schema.ResourceInfo{CPU: "-2", Mem: "2"}}, + }, + }, + }, + ExtensionTemplate: []byte(extensionPaddleYaml), + }, + wantErr: errors.New("negative resources not permitted: map[cpu:-2 memory:2]"), + }, { caseName: "extensionTemplate NilPS", jobObj: &api.PFJob{ From 6700f1bc769a504102c98b376fec19196e6889f0 Mon Sep 17 00:00:00 2001 From: zhongzichao Date: Thu, 24 Nov 2022 13:29:32 +0800 Subject: [PATCH 09/10] add paddle job resource support when using extensiontemplate --- .../runtime_v2/job/paddle/kube_paddle_job.go | 4 ++ .../job/paddle/kube_paddle_job_test.go | 43 ++++++++++++++++--- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go b/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go index cd3ec61b0..1413564d7 100644 --- a/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go +++ b/pkg/job/runtime_v2/job/paddle/kube_paddle_job.go @@ -397,12 +397,16 @@ func (pj *KubePaddleJob) patchResource(pdj *paddlejobv1.PaddleJob, job *api.PFJo log.Infof("patch resource in paddlejob") if len(job.Tasks) == 0 { log.Debugf("no resources to be configured") + return nil } // fill resource var minAvailable int32 minResources := resources.EmptyResource() for _, task := range job.Tasks { + if pfschema.IsEmptyResource(task.Flavour.ResourceInfo) { + continue + } resourceRequirements, err := kuberuntime.GenerateResourceRequirements(task.Flavour) if err != nil { log.Errorf("generate resource requirements failed, err: %v", err) diff --git a/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go b/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go index c6750c993..0f1e8660c 100644 --- a/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go +++ b/pkg/job/runtime_v2/job/paddle/kube_paddle_job_test.go @@ -65,11 +65,11 @@ spec: image: nginx resources: limits: - cpu: "1" - memory: 1Gi + cpu: "2" + memory: 2Gi requests: - cpu: "1" - memory: 1Gi + cpu: "2" + memory: 2Gi ` nilWorkerContainerYaml = ` apiVersion: batch.paddlepaddle.org/v1 @@ -511,6 +511,38 @@ func TestPaddleJob_CreateJob(t *testing.T) { }, wantErr: errors.New("container is required in paddleJob"), }, + { + caseName: "extensionTemplate member has no flavour", + jobObj: &api.PFJob{ + ID: "job-normal-0c272d0c", + Name: "", + Namespace: "default", + JobType: schema.TypeDistributed, + Framework: schema.FrameworkPaddle, + JobMode: schema.EnvJobModePS, + UserName: "root", + QueueID: "mockQueueID", + Tasks: []schema.Member{ + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RoleWorker, + Conf: schema.Conf{ + Name: "normal", + Command: "sleep 200", + Image: "mockImage", + }, + }, + { + ID: "task-normal-0001", + Replicas: 1, + Role: schema.RolePServer, + }, + }, + ExtensionTemplate: []byte(extensionPaddleYaml), + }, + wantErr: nil, + }, { caseName: "create paddle job with Collective mode", jobObj: &mockPaddleJob, @@ -533,10 +565,11 @@ func TestPaddleJob_CreateJob(t *testing.T) { if test.wantErr == nil { assert.Equal(t, test.wantErr, err) t.Logf("case[%s] to CreateJob, paddleFlowJob=%+v", test.caseName, test.jobObj) - _, err := kubeRuntimeClient.Get(test.jobObj.Namespace, test.jobObj.ID, KubePaddleFwVersion) + obj, err := kubeRuntimeClient.Get(test.jobObj.Namespace, test.jobObj.ID, KubePaddleFwVersion) if !assert.NoError(t, err) { t.Errorf(err.Error()) } + t.Logf("result: %v", obj) } else { if assert.Error(t, err) { assert.Equal(t, test.wantErr.Error(), err.Error()) From 1b3a18e275a592be382f69af9806fac53d6dc22a Mon Sep 17 00:00:00 2001 From: zhongzichao Date: Thu, 24 Nov 2022 14:28:14 +0800 Subject: [PATCH 10/10] add paddle job resource support when using extensiontemplate --- pkg/apiserver/controller/job/create.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/apiserver/controller/job/create.go b/pkg/apiserver/controller/job/create.go index a7aab2f17..490649e16 100644 --- a/pkg/apiserver/controller/job/create.go +++ b/pkg/apiserver/controller/job/create.go @@ -163,7 +163,6 @@ func validateMembersResource(ctx *logger.RequestContext, request *CreateJobInfo) return err } request.Members[index].Flavour.ResourceInfo = member.Flavour.ResourceInfo - // sum = sum + member.Replicas * member.Flavour.ResourceInfo memberRes, err := resources.NewResourceFromMap(member.Flavour.ResourceInfo.ToMap()) if err != nil { ctx.Logging().Errorf("Failed to multiply replicas=%d and resourceInfo=%v, err: %v", member.Replicas, member.Flavour.ResourceInfo, err)