Skip to content

Commit cca5bd2

Browse files
authored
feat(runtime): remove runtime v1 from job manager (#890)
* feat(runtime): remove runtime v1 from job manager * update ut for job manager
1 parent 3520589 commit cca5bd2

File tree

2 files changed

+63
-94
lines changed

2 files changed

+63
-94
lines changed

pkg/job/job_manager.go

Lines changed: 12 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package job
1818

1919
import (
2020
"fmt"
21-
"os"
2221
"sync"
2322
"time"
2423

@@ -28,7 +27,6 @@ import (
2827
"github.com/PaddlePaddle/PaddleFlow/pkg/common/config"
2928
"github.com/PaddlePaddle/PaddleFlow/pkg/common/schema"
3029
"github.com/PaddlePaddle/PaddleFlow/pkg/job/api"
31-
"github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime"
3230
"github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime_v2"
3331
"github.com/PaddlePaddle/PaddleFlow/pkg/metrics"
3432
"github.com/PaddlePaddle/PaddleFlow/pkg/model"
@@ -40,8 +38,6 @@ const (
4038
defaultCacheSize = 500
4139
defaultExpireTime = 30
4240
defaultJobLoop = 1
43-
// EnvRuntimeVersion contains the version of PaddleFlow runtime
44-
EnvRuntimeVersion = "PF_RUNTIME_VERSION"
4541
)
4642

4743
type ActiveClustersFunc func() []model.ClusterInfo
@@ -65,8 +61,6 @@ type JobManagerImpl struct {
6561
// clusterRuntimes contains cluster status and runtime services
6662
clusterRuntimes ClusterRuntimes
6763
clusterSyncPeriod time.Duration
68-
69-
isRuntimeV2 bool
7064
}
7165

7266
func NewJobManagerImpl() (*JobManagerImpl, error) {
@@ -109,50 +103,7 @@ func (m *JobManagerImpl) Start(activeClusters ActiveClustersFunc, activeQueueJob
109103
/// init config for job manager
110104
m.init()
111105
// start job manager
112-
rVersion := os.Getenv(EnvRuntimeVersion)
113-
if rVersion == "v1" {
114-
m.startRuntime()
115-
} else {
116-
m.isRuntimeV2 = true
117-
m.startRuntimeV2()
118-
}
119-
}
120-
121-
func (m *JobManagerImpl) startRuntime() {
122-
log.Infof("Start job manager on runtime!")
123-
// submit job to cluster
124-
go m.pJobProcessLoop()
125-
126-
for {
127-
// get active clusters
128-
clusters := m.activeClusters()
129-
130-
for _, cluster := range clusters {
131-
clusterID := api.ClusterID(cluster.ID)
132-
// skip when cluster status is offline
133-
if cluster.Status == model.ClusterStatusOffLine {
134-
log.Warnf("cluster[%s] status is %s, skip it", cluster.ID, model.ClusterStatusOffLine)
135-
m.stopClusterRuntime(clusterID)
136-
continue
137-
}
138-
139-
_, find := m.clusterRuntimes.Get(clusterID)
140-
if !find {
141-
runtimeSvc, err := runtime.GetOrCreateRuntime(cluster)
142-
if err != nil {
143-
log.Errorf("new runtime for cluster[%s] failed, err: %v. skip it", cluster.ID, err)
144-
continue
145-
}
146-
log.Infof("Create new runtime with cluster <%s>", cluster.ID)
147-
148-
cr := NewClusterRuntimeInfo(cluster.Name, runtimeSvc)
149-
m.clusterRuntimes.Store(clusterID, cr)
150-
// start runtime for new cluster
151-
go m.Run(runtimeSvc, cr.StopCh, clusterID)
152-
}
153-
}
154-
time.Sleep(m.clusterSyncPeriod)
155-
}
106+
m.startRuntime()
156107
}
157108

158109
func (m *JobManagerImpl) stopClusterRuntime(clusterID api.ClusterID) {
@@ -163,21 +114,10 @@ func (m *JobManagerImpl) stopClusterRuntime(clusterID api.ClusterID) {
163114
close(cr.StopCh)
164115
}
165116
m.clusterRuntimes.Delete(clusterID)
166-
runtime.PFRuntimeMap.Delete(clusterID)
167117
runtime_v2.PFRuntimeMap.Delete(clusterID)
168118
m.stopClusterQueueSubmit(clusterID)
169119
}
170120

171-
func (m *JobManagerImpl) Run(runtimeService runtime.RuntimeService, stopCh <-chan struct{}, clusterID api.ClusterID) {
172-
log.Infof("Start %s!", runtimeService.Name())
173-
// start queue sync
174-
go runtimeService.SyncQueue(stopCh)
175-
// start job sync
176-
go runtimeService.SyncJob(stopCh)
177-
// start job gc
178-
go runtimeService.GCJob(stopCh)
179-
}
180-
181121
func (m *JobManagerImpl) pJobProcessLoop() {
182122
log.Infof("start job process loop ...")
183123
for {
@@ -245,27 +185,19 @@ func (m *JobManagerImpl) pSubmitQueueJob(jobQueue *api.JobQueue, clusterRuntime
245185
log.Infof("Leaving submit %s job in queue %s, total elapsed time: %s", job.ID, name, time.Since(startTime))
246186
} else {
247187
// TODO: add to config
248-
// time.Sleep(m.jobLoopPeriod)
249188
time.Sleep(200 * time.Millisecond)
250189
}
251190
}
252191
}
253192
}
254193

255-
func (m *JobManagerImpl) submitJob(clusterRuntime *ClusterRuntimeInfo, job *api.PFJob) {
256-
if clusterRuntime == nil || job == nil {
194+
// submitJob submit a job to cluster
195+
func (m *JobManagerImpl) submitJob(clusterRuntime *ClusterRuntimeInfo, jobInfo *api.PFJob) {
196+
if clusterRuntime == nil || jobInfo == nil {
257197
log.Errorf("submit job to cluster failed, err: clusterRuntime or job is nil")
258198
return
259199
}
260-
if m.isRuntimeV2 {
261-
m.submitJobV1(clusterRuntime.RuntimeV2Svc.SubmitJob, job)
262-
} else {
263-
m.submitJobV1(clusterRuntime.RuntimeSvc.SubmitJob, job)
264-
}
265-
}
266200

267-
// submitJob submit a job to cluster
268-
func (m *JobManagerImpl) submitJobV1(jobSubmit func(*api.PFJob) error, jobInfo *api.PFJob) {
269201
log.Infof("begin to submit job %s to cluster", jobInfo.ID)
270202
startTime := time.Now()
271203
job, err := storage.Job.GetJobByID(jobInfo.ID)
@@ -277,7 +209,7 @@ func (m *JobManagerImpl) submitJobV1(jobSubmit func(*api.PFJob) error, jobInfo *
277209
if job.Status == schema.StatusJobInit {
278210
var jobStatus schema.JobStatus
279211
var msg string
280-
err = jobSubmit(jobInfo)
212+
err = clusterRuntime.RuntimeSvc.SubmitJob(jobInfo)
281213
if err != nil {
282214
// new job failed, update db and skip this job
283215
msg = fmt.Sprintf("submit job to cluster failed, err: %s", err)
@@ -320,8 +252,8 @@ func (m *JobManagerImpl) stopQueueSubmit(queueID api.QueueID) {
320252
}
321253

322254
// PaddleFlow runtime v2
323-
// startRuntimeV2 start job manager on runtime v2
324-
func (m *JobManagerImpl) startRuntimeV2() {
255+
// startRuntime start job manager on runtime v2
256+
func (m *JobManagerImpl) startRuntime() {
325257
log.Infof("Start job manager on runtime v2!")
326258
// submit job to cluster
327259
go m.pJobProcessLoop()
@@ -348,7 +280,7 @@ func (m *JobManagerImpl) startRuntimeV2() {
348280
}
349281
log.Infof("Create new runtime with cluster <%s>", cluster.ID)
350282

351-
cr := NewClusterRuntimeV2Info(cluster.Name, runtimeSvc)
283+
cr := NewClusterRuntimeInfo(cluster.Name, runtimeSvc)
352284
m.clusterRuntimes.Store(clusterID, cr)
353285
// start runtime for new cluster
354286
go runtimeSvc.SyncController(cr.StopCh)
@@ -407,28 +339,19 @@ func (m *JobManagerImpl) GetQueue(queueID api.QueueID) (*clusterQueue, bool) {
407339

408340
// ClusterRuntimeInfo defines cluster runtime
409341
type ClusterRuntimeInfo struct {
410-
Name string
411-
StopCh chan struct{}
412-
RuntimeSvc runtime.RuntimeService
413-
RuntimeV2Svc runtime_v2.RuntimeService
342+
Name string
343+
StopCh chan struct{}
344+
RuntimeSvc runtime_v2.RuntimeService
414345
}
415346

416-
func NewClusterRuntimeInfo(name string, r runtime.RuntimeService) *ClusterRuntimeInfo {
347+
func NewClusterRuntimeInfo(name string, r runtime_v2.RuntimeService) *ClusterRuntimeInfo {
417348
return &ClusterRuntimeInfo{
418349
Name: name,
419350
StopCh: make(chan struct{}),
420351
RuntimeSvc: r,
421352
}
422353
}
423354

424-
func NewClusterRuntimeV2Info(name string, r runtime_v2.RuntimeService) *ClusterRuntimeInfo {
425-
return &ClusterRuntimeInfo{
426-
Name: name,
427-
StopCh: make(chan struct{}),
428-
RuntimeV2Svc: r,
429-
}
430-
}
431-
432355
// ClusterRuntimes contains cluster runtimes
433356
type ClusterRuntimes struct {
434357
sync.RWMutex

pkg/job/job_manager_test.go

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,44 +17,90 @@ limitations under the License.
1717
package job
1818

1919
import (
20+
"reflect"
2021
"testing"
2122
"time"
2223

24+
"github.com/agiledragon/gomonkey/v2"
2325
"github.com/stretchr/testify/assert"
2426

2527
"github.com/PaddlePaddle/PaddleFlow/pkg/common/config"
28+
"github.com/PaddlePaddle/PaddleFlow/pkg/common/schema"
29+
"github.com/PaddlePaddle/PaddleFlow/pkg/job/api"
30+
runtime "github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime_v2"
2631
"github.com/PaddlePaddle/PaddleFlow/pkg/model"
2732
"github.com/PaddlePaddle/PaddleFlow/pkg/storage"
2833
"github.com/PaddlePaddle/PaddleFlow/pkg/storage/driver"
2934
)
3035

36+
const (
37+
mockQueueID = "queue-test"
38+
mockClusterID = "cluster-test"
39+
)
40+
3141
func TestJobManager(t *testing.T) {
3242
config.GlobalServerConfig = &config.ServerConfig{}
3343

34-
clusterInfo := &model.ClusterInfo{
35-
Name: "test-cluster",
36-
Status: model.ClusterStatusOnLine,
44+
mockCluster := &model.ClusterInfo{
45+
Model: model.Model{
46+
ID: mockClusterID,
47+
},
48+
Name: "test-cluster",
49+
ClusterType: schema.KubernetesType,
50+
Status: model.ClusterStatusOnLine,
51+
}
52+
mockQueue := &model.Queue{
53+
Model: model.Model{
54+
ID: mockQueueID,
55+
},
56+
Status: schema.StatusQueueOpen,
57+
ClusterId: mockClusterID,
3758
}
3859
jobM, err := NewJobManagerImpl()
3960
assert.Equal(t, nil, err)
4061
testCases := []struct {
4162
name string
4263
jobManager *JobManagerImpl
64+
job *model.Job
4365
err error
4466
}{
4567
{
4668
name: "start job manager v2",
4769
jobManager: jobM,
48-
err: nil,
70+
job: &model.Job{
71+
Name: "test-job",
72+
Status: schema.StatusJobInit,
73+
QueueID: mockQueueID,
74+
Config: &schema.Conf{},
75+
},
76+
err: nil,
4977
},
5078
}
5179

5280
driver.InitMockDB()
53-
err = storage.Cluster.CreateCluster(clusterInfo)
81+
err = storage.Cluster.CreateCluster(mockCluster)
82+
assert.Equal(t, nil, err)
83+
err = storage.Queue.CreateQueue(mockQueue)
5484
assert.Equal(t, nil, err)
85+
// mock cluster
86+
rts := &runtime.KubeRuntime{}
87+
var p1 = gomonkey.ApplyPrivateMethod(reflect.TypeOf(rts), "Init", func() error {
88+
return nil
89+
})
90+
defer p1.Reset()
91+
var p2 = gomonkey.ApplyPrivateMethod(reflect.TypeOf(rts), "SyncController", func(<-chan struct{}) error {
92+
return nil
93+
})
94+
defer p2.Reset()
95+
var p3 = gomonkey.ApplyPrivateMethod(reflect.TypeOf(rts), "SubmitJob", func(*api.PFJob) error {
96+
return nil
97+
})
98+
defer p3.Reset()
5599

56100
for _, testCase := range testCases {
57101
t.Run(testCase.name, func(t *testing.T) {
102+
err = storage.Job.CreateJob(testCase.job)
103+
assert.Equal(t, nil, err)
58104
go testCase.jobManager.Start(storage.Cluster.ActiveClusters, storage.Job.ListQueueJob)
59105
time.Sleep(2 * time.Second)
60106
})

0 commit comments

Comments
 (0)