Skip to content

Commit e65c96a

Browse files
authored
feat(job): refactor runtime job struct (#965)
* refactor runtime job struct * refactor job event handler * rename parameters
1 parent 81e1fab commit e65c96a

File tree

9 files changed

+162
-815
lines changed

9 files changed

+162
-815
lines changed

pkg/job/runtime_v2/job/argoworkflow/kube_workflow_job.go

Lines changed: 4 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ import (
2424
log "github.com/sirupsen/logrus"
2525
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2626
"k8s.io/apimachinery/pkg/runtime"
27-
"k8s.io/apimachinery/pkg/runtime/schema"
28-
"k8s.io/client-go/tools/cache"
2927
"k8s.io/client-go/util/workqueue"
3028

3129
"github.com/PaddlePaddle/PaddleFlow/pkg/common/k8s"
@@ -43,24 +41,15 @@ var (
4341

4442
// KubeArgoWorkflowJob is a struct that runs an argo workflow
4543
type KubeArgoWorkflowJob struct {
46-
GVK schema.GroupVersionKind
47-
frameworkVersion pfschema.FrameworkVersion
48-
runtimeClient framework.RuntimeClientInterface
49-
jobQueue workqueue.RateLimitingInterface
44+
kuberuntime.KubeBaseJob
5045
}
5146

5247
func New(kubeClient framework.RuntimeClientInterface) framework.JobInterface {
5348
return &KubeArgoWorkflowJob{
54-
runtimeClient: kubeClient,
55-
GVK: JobGVK,
56-
frameworkVersion: KubeArgoWorkflowFwVersion,
49+
KubeBaseJob: kuberuntime.NewKubeBaseJob(JobGVK, KubeArgoWorkflowFwVersion, kubeClient),
5750
}
5851
}
5952

60-
func (pj *KubeArgoWorkflowJob) String(name string) string {
61-
return fmt.Sprintf("%s job %s on %s", pj.GVK.String(), name, pj.runtimeClient.Cluster())
62-
}
63-
6453
func (pj *KubeArgoWorkflowJob) Submit(ctx context.Context, job *api.PFJob) error {
6554
if job == nil {
6655
return fmt.Errorf("job is nil")
@@ -88,7 +77,7 @@ func (pj *KubeArgoWorkflowJob) Submit(ctx context.Context, job *api.PFJob) error
8877
return err
8978
}
9079
log.Debugf("begin to create %s, job info: %v", pj.String(jobName), argoWfJob)
91-
err = pj.runtimeClient.Create(argoWfJob, pj.frameworkVersion)
80+
err = pj.RuntimeClient.Create(argoWfJob, pj.FrameworkVersion)
9281
if err != nil {
9382
log.Errorf("create %s failed, err %v", pj.String(jobName), err)
9483
return err
@@ -101,102 +90,17 @@ func (pj *KubeArgoWorkflowJob) customTFJobSpec(spec *wfv1.WorkflowSpec, job *api
10190
return nil
10291
}
10392

104-
func (pj *KubeArgoWorkflowJob) Stop(ctx context.Context, job *api.PFJob) error {
105-
if job == nil {
106-
return fmt.Errorf("job is nil")
107-
}
108-
jobName := job.NamespacedName()
109-
log.Infof("begin to stop %s", pj.String(jobName))
110-
if err := pj.runtimeClient.Delete(job.Namespace, job.ID, pj.frameworkVersion); err != nil {
111-
log.Errorf("stop %s failed, err: %v", pj.String(jobName), err)
112-
return err
113-
}
114-
return nil
115-
}
116-
117-
func (pj *KubeArgoWorkflowJob) Update(ctx context.Context, job *api.PFJob) error {
118-
if job == nil {
119-
return fmt.Errorf("job is nil")
120-
}
121-
jobName := job.NamespacedName()
122-
log.Infof("begin to update %s", pj.String(jobName))
123-
if err := kuberuntime.UpdateKubeJob(job, pj.runtimeClient, pj.frameworkVersion); err != nil {
124-
log.Errorf("update %s failed, err: %v", pj.String(jobName), err)
125-
return err
126-
}
127-
return nil
128-
}
129-
130-
func (pj *KubeArgoWorkflowJob) Delete(ctx context.Context, job *api.PFJob) error {
131-
if job == nil {
132-
return fmt.Errorf("job is nil")
133-
}
134-
jobName := job.NamespacedName()
135-
log.Infof("begin to delete %s ", pj.String(jobName))
136-
if err := pj.runtimeClient.Delete(job.Namespace, job.ID, pj.frameworkVersion); err != nil {
137-
log.Errorf("delete %s failed, err %v", pj.String(jobName), err)
138-
return err
139-
}
140-
return nil
141-
}
142-
143-
func (pj *KubeArgoWorkflowJob) GetLog(ctx context.Context, jobLogRequest pfschema.JobLogRequest) (pfschema.JobLogInfo, error) {
144-
// TODO: add get log logic
145-
return pfschema.JobLogInfo{}, nil
146-
}
147-
14893
func (pj *KubeArgoWorkflowJob) AddEventListener(ctx context.Context, listenerType string, jobQueue workqueue.RateLimitingInterface, listener interface{}) error {
14994
var err error
15095
switch listenerType {
15196
case pfschema.ListenerTypeJob:
152-
err = pj.addJobEventListener(ctx, jobQueue, listener)
97+
err = pj.AddJobEventListener(ctx, jobQueue, listener, pj.JobStatus, nil)
15398
default:
15499
err = fmt.Errorf("listenerType %s is not supported", listenerType)
155100
}
156101
return err
157102
}
158103

159-
func (pj *KubeArgoWorkflowJob) addJobEventListener(ctx context.Context, jobQueue workqueue.RateLimitingInterface, listener interface{}) error {
160-
if jobQueue == nil || listener == nil {
161-
return fmt.Errorf("add job event listener failed, err: listener is nil")
162-
}
163-
pj.jobQueue = jobQueue
164-
informer := listener.(cache.SharedIndexInformer)
165-
informer.AddEventHandler(cache.FilteringResourceEventHandler{
166-
FilterFunc: kuberuntime.ResponsibleForJob,
167-
Handler: cache.ResourceEventHandlerFuncs{
168-
AddFunc: pj.addJob,
169-
UpdateFunc: pj.updateJob,
170-
DeleteFunc: pj.deleteJob,
171-
},
172-
})
173-
return nil
174-
}
175-
176-
func (pj *KubeArgoWorkflowJob) addJob(obj interface{}) {
177-
jobSyncInfo, err := kuberuntime.JobAddFunc(obj, pj.JobStatus)
178-
if err != nil {
179-
return
180-
}
181-
pj.jobQueue.Add(jobSyncInfo)
182-
}
183-
184-
func (pj *KubeArgoWorkflowJob) updateJob(old, new interface{}) {
185-
jobSyncInfo, err := kuberuntime.JobUpdateFunc(old, new, pj.JobStatus)
186-
if err != nil {
187-
return
188-
}
189-
pj.jobQueue.Add(jobSyncInfo)
190-
}
191-
192-
func (pj *KubeArgoWorkflowJob) deleteJob(obj interface{}) {
193-
jobSyncInfo, err := kuberuntime.JobDeleteFunc(obj, pj.JobStatus)
194-
if err != nil {
195-
return
196-
}
197-
pj.jobQueue.Add(jobSyncInfo)
198-
}
199-
200104
// JobStatus get the statusInfo of Argo Workflow job, including origin status, pf status and message
201105
func (pj *KubeArgoWorkflowJob) JobStatus(obj interface{}) (api.StatusInfo, error) {
202106
unObj := obj.(*unstructured.Unstructured)

pkg/job/runtime_v2/job/mpi/kube_mpi_job.go

Lines changed: 4 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ import (
2525
log "github.com/sirupsen/logrus"
2626
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2727
"k8s.io/apimachinery/pkg/runtime"
28-
"k8s.io/apimachinery/pkg/runtime/schema"
29-
"k8s.io/client-go/tools/cache"
3028
"k8s.io/client-go/util/workqueue"
3129

3230
"github.com/PaddlePaddle/PaddleFlow/pkg/common/k8s"
@@ -45,24 +43,15 @@ var (
4543

4644
// KubeMPIJob is a struct that runs a mpi job
4745
type KubeMPIJob struct {
48-
GVK schema.GroupVersionKind
49-
frameworkVersion pfschema.FrameworkVersion
50-
runtimeClient framework.RuntimeClientInterface
51-
jobQueue workqueue.RateLimitingInterface
46+
kuberuntime.KubeBaseJob
5247
}
5348

5449
func New(kubeClient framework.RuntimeClientInterface) framework.JobInterface {
5550
return &KubeMPIJob{
56-
runtimeClient: kubeClient,
57-
GVK: JobGVK,
58-
frameworkVersion: KubeMPIFwVersion,
51+
KubeBaseJob: kuberuntime.NewKubeBaseJob(JobGVK, KubeMPIFwVersion, kubeClient),
5952
}
6053
}
6154

62-
func (mj *KubeMPIJob) String(name string) string {
63-
return fmt.Sprintf("%s job %s on %s", mj.GVK.String(), name, mj.runtimeClient.Cluster())
64-
}
65-
6655
func (mj *KubeMPIJob) Submit(ctx context.Context, job *api.PFJob) error {
6756
jobName := job.NamespacedName()
6857
mpiJob := &mpiv1.MPIJob{}
@@ -87,7 +76,7 @@ func (mj *KubeMPIJob) Submit(ctx context.Context, job *api.PFJob) error {
8776
return err
8877
}
8978
log.Debugf("begin to create %s, job info: %v", mj.String(jobName), mpiJob)
90-
err = mj.runtimeClient.Create(mpiJob, mj.frameworkVersion)
79+
err = mj.RuntimeClient.Create(mpiJob, mj.FrameworkVersion)
9180
if err != nil {
9281
log.Errorf("create %s failed, err %v", mj.String(jobName), err)
9382
return err
@@ -141,105 +130,17 @@ func (mj *KubeMPIJob) customMPIJobSpec(mpiJobSpec *mpiv1.MPIJobSpec, job *api.PF
141130
return kuberuntime.KubeflowRunPolicy(&mpiJobSpec.RunPolicy, nil, job.Conf.GetQueueName(), job.Conf.GetPriority())
142131
}
143132

144-
func (mj *KubeMPIJob) Stop(ctx context.Context, job *api.PFJob) error {
145-
if job == nil {
146-
return fmt.Errorf("job is nil")
147-
}
148-
jobName := job.NamespacedName()
149-
log.Infof("begin to stop %s", mj.String(jobName))
150-
if err := mj.runtimeClient.Delete(job.Namespace, job.ID, mj.frameworkVersion); err != nil {
151-
log.Errorf("stop %s failed, err: %v", mj.String(jobName), err)
152-
return err
153-
}
154-
return nil
155-
}
156-
157-
func (mj *KubeMPIJob) Update(ctx context.Context, job *api.PFJob) error {
158-
if job == nil {
159-
return fmt.Errorf("job is nil")
160-
}
161-
jobName := job.NamespacedName()
162-
log.Infof("begin to update %s", mj.String(jobName))
163-
if err := kuberuntime.UpdateKubeJob(job, mj.runtimeClient, mj.frameworkVersion); err != nil {
164-
log.Errorf("update %s failed, err: %v", mj.String(jobName), err)
165-
return err
166-
}
167-
return nil
168-
}
169-
170-
func (mj *KubeMPIJob) Delete(ctx context.Context, job *api.PFJob) error {
171-
if job == nil {
172-
return fmt.Errorf("job is nil")
173-
}
174-
jobName := job.NamespacedName()
175-
log.Infof("begin to delete %s ", mj.String(jobName))
176-
if err := mj.runtimeClient.Delete(job.Namespace, job.ID, mj.frameworkVersion); err != nil {
177-
log.Errorf("delete %s failed, err %v", mj.String(jobName), err)
178-
return err
179-
}
180-
return nil
181-
}
182-
183-
func (mj *KubeMPIJob) GetLog(ctx context.Context, jobLogRequest pfschema.JobLogRequest) (pfschema.JobLogInfo, error) {
184-
// TODO: add get log logic
185-
return pfschema.JobLogInfo{}, nil
186-
}
187-
188133
func (mj *KubeMPIJob) AddEventListener(ctx context.Context, listenerType string, jobQueue workqueue.RateLimitingInterface, listener interface{}) error {
189134
var err error
190135
switch listenerType {
191136
case pfschema.ListenerTypeJob:
192-
err = mj.addJobEventListener(ctx, jobQueue, listener)
137+
err = mj.AddJobEventListener(ctx, jobQueue, listener, mj.JobStatus, nil)
193138
default:
194139
err = fmt.Errorf("listenerType %s is not supported", listenerType)
195140
}
196141
return err
197142
}
198143

199-
func (mj *KubeMPIJob) addJobEventListener(ctx context.Context, jobQueue workqueue.RateLimitingInterface, listener interface{}) error {
200-
if jobQueue == nil || listener == nil {
201-
return fmt.Errorf("add job event listener failed, err: listener is nil")
202-
}
203-
mj.jobQueue = jobQueue
204-
informer := listener.(cache.SharedIndexInformer)
205-
informer.AddEventHandler(cache.FilteringResourceEventHandler{
206-
FilterFunc: kuberuntime.ResponsibleForJob,
207-
Handler: cache.ResourceEventHandlerFuncs{
208-
AddFunc: mj.addJob,
209-
UpdateFunc: mj.updateJob,
210-
DeleteFunc: mj.deleteJob,
211-
},
212-
})
213-
return nil
214-
}
215-
216-
func (mj *KubeMPIJob) addJob(obj interface{}) {
217-
jobSyncInfo, err := kuberuntime.JobAddFunc(obj, mj.JobStatus)
218-
if err != nil {
219-
log.Errorf("add job failed, err: %v", err)
220-
return
221-
}
222-
mj.jobQueue.Add(jobSyncInfo)
223-
}
224-
225-
func (mj *KubeMPIJob) updateJob(old, new interface{}) {
226-
jobSyncInfo, err := kuberuntime.JobUpdateFunc(old, new, mj.JobStatus)
227-
if err != nil {
228-
log.Errorf("update job failed, err: %v", err)
229-
return
230-
}
231-
mj.jobQueue.Add(jobSyncInfo)
232-
}
233-
234-
func (mj *KubeMPIJob) deleteJob(obj interface{}) {
235-
jobSyncInfo, err := kuberuntime.JobDeleteFunc(obj, mj.JobStatus)
236-
if err != nil {
237-
log.Errorf("delete job failed, err: %v", err)
238-
return
239-
}
240-
mj.jobQueue.Add(jobSyncInfo)
241-
}
242-
243144
// JobStatus get the statusInfo of mpi job, including origin status, pf status and message
244145
func (mj *KubeMPIJob) JobStatus(obj interface{}) (api.StatusInfo, error) {
245146
unObj := obj.(*unstructured.Unstructured)

0 commit comments

Comments
 (0)