Skip to content

Commit 50bec09

Browse files
authored
Fix bug that can't distinguish standalone job (#828)
* Fix bug that can't distinguish standalone job * Fix bug that update priority for single job failed * filter pod with labels owner
1 parent f86927c commit 50bec09

File tree

9 files changed

+78
-26
lines changed

9 files changed

+78
-26
lines changed

pkg/common/schema/job.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,11 @@ const (
133133
PriorityClassHigh = "high"
134134
PriorityClassVeryHigh = "very-high"
135135

136-
JobOwnerLabel = "owner"
137-
JobOwnerValue = "paddleflow"
138-
JobIDLabel = "paddleflow-job-id"
139-
JobTTLSeconds = "padleflow/job-ttl-seconds"
136+
JobOwnerLabel = "owner"
137+
JobOwnerValue = "paddleflow"
138+
JobIDLabel = "paddleflow-job-id"
139+
JobTTLSeconds = "padleflow/job-ttl-seconds"
140+
JobLabelFramework = "paddleflow-job-framework"
140141

141142
VolcanoJobNameLabel = "volcano.sh/job-name"
142143
QueueLabelKey = "volcano.sh/queue-name"

pkg/job/runtime/kubernetes/controller/job_sync.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -359,12 +359,18 @@ func (j *JobSync) syncTaskStatus(taskSyncInfo *TaskSyncInfo) error {
359359

360360
func responsibleForJob(obj interface{}) bool {
361361
job := obj.(*unstructured.Unstructured)
362+
jobName := job.GetName()
362363
labels := job.GetLabels()
364+
gvk := job.GroupVersionKind()
363365
if labels != nil && labels[commonschema.JobOwnerLabel] == commonschema.JobOwnerValue {
364-
log.Debugf("responsible for handle job. jobName:[%s]", job.GetName())
366+
if gvk == k8s.PodGVK && labels[commonschema.JobLabelFramework] != string(commonschema.FrameworkStandalone) {
367+
log.Debugf("job %s is not single job", jobName)
368+
return false
369+
}
370+
log.Debugf("responsible for handle job. jobName:[%s]", jobName)
365371
return true
366372
}
367-
log.Debugf("responsible for skip job. jobName:[%s]", job.GetName())
373+
log.Debugf("responsible for skip job. jobName:[%s]", jobName)
368374
return false
369375
}
370376

pkg/job/runtime/kubernetes/controller/job_sync_handler.go

+16-16
Original file line numberDiff line numberDiff line change
@@ -335,26 +335,26 @@ func getJobByTask(obj *unstructured.Unstructured) string {
335335
log.Errorf("get job by task failed, obj is nil")
336336
return ""
337337
}
338-
name := obj.GetName()
339-
namespace := obj.GetNamespace()
340338
labels := obj.GetLabels()
341339
ownerReferences := obj.GetOwnerReferences()
342340

343-
if len(ownerReferences) == 0 {
344-
// get job name for single job
345-
if labels != nil && labels[schema.JobOwnerLabel] == schema.JobOwnerValue {
346-
return name
347-
} else {
348-
log.Debugf("pod %s/%s not belong to paddlefow job, skip it.", namespace, name)
349-
return ""
341+
if labels == nil || labels[schema.JobOwnerLabel] != schema.JobOwnerValue {
342+
return ""
343+
}
344+
// 1. get job name from ownerReferences, including workflow, PaddleJob
345+
if len(ownerReferences) > 0 {
346+
// get job name for distributed job
347+
ownerReference := ownerReferences[0]
348+
gvk := k8sschema.FromAPIVersionAndKind(ownerReference.APIVersion, ownerReference.Kind)
349+
_, find := k8s.GVKJobStatusMap[gvk]
350+
if find {
351+
return ownerReference.Name
350352
}
351353
}
352-
// get job name for distributed job
353-
ownerReference := ownerReferences[0]
354-
gvk := k8sschema.FromAPIVersionAndKind(ownerReference.APIVersion, ownerReference.Kind)
355-
_, find := k8s.GVKJobStatusMap[gvk]
356-
if !find {
357-
return ""
354+
// 2. get job name from pod labels
355+
jobName, find := labels[schema.JobIDLabel]
356+
if find {
357+
return jobName
358358
}
359-
return ownerReference.Name
359+
return ""
360360
}

pkg/job/runtime/kubernetes/executor/kube_builtin_job.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (j *KubeJob) setPodTemplateSpec(podSpec *corev1.PodTemplateSpec, task *sche
6363
if podSpec == nil || task == nil {
6464
return fmt.Errorf("podTemplateSpec or task is nil")
6565
}
66-
// TODO: set pod metadata
66+
j.patchTaskMetadata(&podSpec.ObjectMeta, *task)
6767
// set SchedulerName
6868
podSpec.Spec.SchedulerName = config.GlobalServerConfig.Job.SchedulerName
6969
// TODO: remove hard coded schedulerName when upstream package is fixed

pkg/job/runtime/kubernetes/executor/kubernetes_job.go

+4
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,7 @@ func (j *KubeJob) patchTaskMetadata(metadata *metav1.ObjectMeta, member schema.M
553553
metadata.Annotations = j.appendAnnotationsIfAbsent(metadata.Annotations, member.Annotations)
554554
metadata.Labels = j.appendLabelsIfAbsent(metadata.Labels, member.Labels)
555555
metadata.Labels[schema.JobIDLabel] = j.ID
556+
metadata.Labels[schema.JobOwnerLabel] = schema.JobOwnerValue
556557
}
557558

558559
func (j *KubeJob) fillPodTemplateSpec(pod *corev1.PodTemplateSpec, member schema.Member) error {
@@ -647,6 +648,9 @@ func GetPodGroupName(jobID string) string {
647648
if anno != nil {
648649
pgName = anno[schedulingv1beta1.KubeGroupNameAnnotationKey]
649650
}
651+
if pgName == "" {
652+
pgName = fmt.Sprintf("podgroup-%s", jobObj.GetUID())
653+
}
650654
default:
651655
log.Warningf("the framework[%s] of job is not supported", job.Framework)
652656
pgName = jobID

pkg/job/runtime/kubernetes/executor/pytorch_job.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,25 @@ func (pj *PyTorchJob) validateCustomYaml(torchJobSpec *pytorchv1.PyTorchJobSpec)
108108

109109
// customPyTorchJobSpec set custom PyTorchJob Spec
110110
func (pj *PyTorchJob) customPyTorchJobSpec(torchJobSpec *pytorchv1.PyTorchJobSpec) error {
111+
if torchJobSpec == nil || torchJobSpec.PyTorchReplicaSpecs == nil {
112+
err := fmt.Errorf("build custom %s failed, PyTorchJobSpec or PyTorchReplicaSpecs is nil", pj.String())
113+
log.Errorf("%v", err)
114+
return err
115+
}
111116
log.Debugf("patch %s spec:%#v", pj.String(), torchJobSpec)
112117
err := pj.validateCustomYaml(torchJobSpec)
113118
if err != nil {
114119
return err
115120
}
116-
// TODO: patch pytorch job from user
121+
// patch metadata
122+
ps, find := torchJobSpec.PyTorchReplicaSpecs[pytorchv1.PyTorchReplicaTypeMaster]
123+
if find && ps != nil {
124+
pj.patchTaskMetadata(&ps.Template.ObjectMeta, schema.Member{})
125+
}
126+
worker, find := torchJobSpec.PyTorchReplicaSpecs[pytorchv1.PyTorchReplicaTypeWorker]
127+
if find && worker != nil {
128+
pj.patchTaskMetadata(&worker.Template.ObjectMeta, schema.Member{})
129+
}
117130
// check RunPolicy
118131
return nil
119132
}

pkg/job/runtime/kubernetes/executor/ray_job.go

+10
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,21 @@ func (j *RayJob) validateCustomYaml(rayJobSpec *rayV1alpha1.RayJobSpec) error {
240240

241241
// customPyTorchJobSpec set custom PyTorchJob Spec
242242
func (j *RayJob) customRayJobSpec(rayJobSpec *rayV1alpha1.RayJobSpec) error {
243+
if rayJobSpec == nil {
244+
err := fmt.Errorf("build custom %s failed, RayJobSpec is nil", j.String())
245+
log.Errorf("%v", err)
246+
return err
247+
}
243248
log.Debugf("patch %s spec:%#v", j.String(), rayJobSpec)
244249
err := j.validateCustomYaml(rayJobSpec)
245250
if err != nil {
246251
return err
247252
}
253+
// patch metadata
254+
j.patchTaskMetadata(&rayJobSpec.RayClusterSpec.HeadGroupSpec.Template.ObjectMeta, schema.Member{})
255+
for i := range rayJobSpec.RayClusterSpec.WorkerGroupSpecs {
256+
j.patchTaskMetadata(&rayJobSpec.RayClusterSpec.WorkerGroupSpecs[i].Template.ObjectMeta, schema.Member{})
257+
}
248258
// TODO: patch ray job from user
249259
// check RunPolicy
250260
return nil

pkg/job/runtime/kubernetes/executor/singlejob.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,12 @@ func (sp *SingleJob) patchSinglePodVariable(pod *v1.Pod, jobID string) error {
8080
priorityClass := sp.getPriorityClass()
8181
pod.Spec.PriorityClassName = priorityClass
8282
}
83-
sp.fillPodSpec(&pod.Spec, nil)
83+
// set single job mark
84+
pod.Labels[schema.JobLabelFramework] = string(schema.FrameworkStandalone)
85+
if err := sp.fillPodSpec(&pod.Spec, nil); err != nil {
86+
log.Errorf("single job fillPodSpec failed, err: %v", err)
87+
return err
88+
}
8489

8590
// file container
8691
if err := sp.fillContainersInPod(pod); err != nil {

pkg/job/runtime/kubernetes/executor/tensorflow_job.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,24 @@ func (j *TFJob) validateCustomYaml(tfSpec *tfv1.TFJobSpec) error {
114114
// customPyTorchJobSpec set custom PyTorchJob Spec
115115
func (j *TFJob) customTFJobSpec(tfJobSpec *tfv1.TFJobSpec) error {
116116
log.Debugf("patch %s spec:%#v", j.String(), tfJobSpec)
117+
if tfJobSpec == nil || tfJobSpec.TFReplicaSpecs == nil {
118+
err := fmt.Errorf("build custom %s failed, TFJobSpec or TFReplicaSpecs is nil", j.String())
119+
log.Errorf("%v", err)
120+
return err
121+
}
117122
err := j.validateCustomYaml(tfJobSpec)
118123
if err != nil {
119124
return err
120125
}
121-
// TODO: patch tf job from user
126+
// patch metadata
127+
ps, find := tfJobSpec.TFReplicaSpecs[tfv1.TFReplicaTypePS]
128+
if find && ps != nil {
129+
j.patchTaskMetadata(&ps.Template.ObjectMeta, schema.Member{})
130+
}
131+
worker, find := tfJobSpec.TFReplicaSpecs[tfv1.TFReplicaTypeWorker]
132+
if find && worker != nil {
133+
j.patchTaskMetadata(&worker.Template.ObjectMeta, schema.Member{})
134+
}
122135
// check RunPolicy
123136
return nil
124137
}

0 commit comments

Comments
 (0)