Skip to content

add paddle job resource support when using extensiontemplate #944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Nov 24, 2022
102 changes: 98 additions & 4 deletions pkg/apiserver/controller/job/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,27 @@ func validateJob(ctx *logger.RequestContext, request *CreateJobInfo) error {
}

if len(request.ExtensionTemplate) != 0 {
// extension template from user
ctx.Logging().Infof("request ExtensionTemplate is not empty, pass validate members")
// validate extension template from user
if len(request.Members) == 0 {
ctx.Logging().Infof("request ExtensionTemplate pass validate nil members")
return nil
}
// todo validateMembers using these function
// members not nil, continue validate
if err := validateMembersRole(ctx, request); err != nil {
ctx.Logging().Errorf("validate members role failed, err: %v", err)
return err
}
// validate scheduleInfo in members
if err := validateMembersScheduleInfo(ctx, request); err != nil {
ctx.Logging().Errorf("validate members role failed, err: %v", err)
return err
}
// validate resource in members
if err := validateMembersResource(ctx, request); err != nil {
ctx.Logging().Errorf("validate members role failed, err: %v", err)
return err
}
} else {
// validate members
if err := validateJobMembers(ctx, request); err != nil {
Expand All @@ -116,6 +135,58 @@ func validateJob(ctx *logger.RequestContext, request *CreateJobInfo) error {
return nil
}

// validateScheduleInfo include
func validateMembersScheduleInfo(ctx *logger.RequestContext, request *CreateJobInfo) error {
var err error
// validate queue
for _, member := range request.Members {
if err = validateMembersQueue(ctx, &member, request.SchedulingPolicy); err != nil {
ctx.Logging().Errorf("Failed to check Members' Queue: %v", err)
return err
}
// check members priority
if err = checkPriority(&member.SchedulingPolicy, &request.SchedulingPolicy); err != nil {
ctx.Logging().Errorf("Failed to check priority: %v", err)
return err
}
}
return nil
}

func validateMembersResource(ctx *logger.RequestContext, request *CreateJobInfo) error {
var err error
sumResource := resources.EmptyResource()
for index, member := range request.Members {
member.Flavour, err = flavour.GetFlavourWithCheck(member.Flavour)
if err != nil {
log.Errorf("get flavour failed, err:%v", err)
return err
}
request.Members[index].Flavour.ResourceInfo = member.Flavour.ResourceInfo
memberRes, err := resources.NewResourceFromMap(member.Flavour.ResourceInfo.ToMap())
if err != nil {
ctx.Logging().Errorf("Failed to multiply replicas=%d and resourceInfo=%v, err: %v", member.Replicas, member.Flavour.ResourceInfo, err)
ctx.ErrorCode = common.JobInvalidField
return err
}
ctx.Logging().Debugf("member resource info %v", member.Flavour.ResourceInfo)
if memberRes.CPU() == 0 || memberRes.Memory() == 0 {
err = fmt.Errorf("flavour[%v] cpu or memory is empty", memberRes)
ctx.Logging().Errorf("Failed to check flavour: %v", err)
return err
}
memberRes.Multi(member.Replicas)
sumResource.Add(memberRes)
}
// validate queue and total-member-resource
if !sumResource.LessEqual(request.SchedulingPolicy.MaxResources) {
errMsg := fmt.Sprintf("the flavour[%+v] is larger than queue's [%+v]", sumResource, request.SchedulingPolicy.MaxResources)
ctx.Logging().Errorf(errMsg)
return fmt.Errorf(errMsg)
}
return nil
}

func validateCommonJobInfo(ctx *logger.RequestContext, requestCommonJobInfo *CommonJobInfo) error {
// validate job id
if requestCommonJobInfo.ID != "" {
Expand Down Expand Up @@ -146,6 +217,28 @@ func validateCommonJobInfo(ctx *logger.RequestContext, requestCommonJobInfo *Com
return nil
}

func validateMembersRole(ctx *logger.RequestContext, request *CreateJobInfo) error {
log.Infof("validate job %s MembersRole", request.Name)
frameworkRoles := getFrameworkRoles(request.Framework)
for _, member := range request.Members {
memberRole := schema.MemberRole(member.Role)
_, find := frameworkRoles[memberRole]
if !find {
err := fmt.Errorf("the role[%s] for framework %s is not supported", member.Role, request.Framework)
ctx.Logging().Errorf("Failed to check Members' role, err: %v", err)
return err
}
frameworkRoles[memberRole] = frameworkRoles[memberRole] + member.Replicas
}
var err error
request.Mode, err = checkMemberRole(request.Framework, frameworkRoles)
if err != nil {
ctx.Logging().Errorf("check member role for framework %s failed, err: %v", request.Framework, err)
return err
}
return nil
}

func validateJobMembers(ctx *logger.RequestContext, request *CreateJobInfo) error {
if len(request.Members) == 0 {
err := fmt.Errorf("request.Members is empty")
Expand Down Expand Up @@ -487,9 +580,10 @@ func buildJob(request *CreateJobInfo) (*model.Job, error) {
var members []schema.Member
var templateJson string
var err error
if len(request.ExtensionTemplate) == 0 {
if len(request.Members) != 0 {
members = buildMembers(request)
} else {
}
if len(request.ExtensionTemplate) != 0 {
templateJson, err = newExtensionTemplateJson(request.ExtensionTemplate)
if err != nil {
log.Errorf("parse extension template failed, err: %v", err)
Expand Down
Loading