Skip to content

Commit 49f5af9

Browse files
authored
Add ElasticResourceQuota sync for queue (#28)
Add ElasticResourceQuota sync
1 parent 7311c26 commit 49f5af9

File tree

4 files changed

+84
-57
lines changed

4 files changed

+84
-57
lines changed

pkg/common/k8s/api.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ var (
5252
SparkAppGVK: SparkAppStatus,
5353
PaddleJobGVK: PaddleJobStatus,
5454
}
55+
// GVKToQuotaType GroupVersionKind lists for PaddleFlow QuotaType
56+
GVKToQuotaType = []schema.GroupVersionKind{
57+
VCQueueGVK,
58+
EQuotaGVK,
59+
}
5560
)
5661

5762
type StatusInfo struct {

pkg/job/runtime/kubernetes/controller/job_gc_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
var (
4545
VCJobGVR = schema.GroupVersionResource{Group: "batch.volcano.sh", Version: "v1alpha1", Resource: "jobs"}
4646
VCQueueGVR = schema.GroupVersionResource{Group: "scheduling.volcano.sh", Version: "v1beta1", Resource: "queues"}
47+
EQuotaGVR = schema.GroupVersionResource{Group: "scheduling.volcano.sh", Version: "v1beta1", Resource: "elasticresourcequotas"}
4748
PodGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
4849
)
4950

@@ -81,6 +82,7 @@ var DiscoveryHandlerFunc = http.HandlerFunc(func(w http.ResponseWriter, req *htt
8182
GroupVersion: "scheduling.volcano.sh/v1beta1",
8283
APIResources: []metav1.APIResource{
8384
{Name: "queues", Namespaced: false, Kind: "Queue"},
85+
{Name: "elasticresourcequotas", Namespaced: false, Kind: "ElasticResourceQuota"},
8486
},
8587
}
8688
case "/apis/sparkoperator.k8s.io/v1beta2":

pkg/job/runtime/kubernetes/controller/queue_sync.go

Lines changed: 57 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,13 @@ import (
2323

2424
log "github.com/sirupsen/logrus"
2525
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
26-
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/apimachinery/pkg/runtime/schema"
2727
"k8s.io/apimachinery/pkg/util/wait"
2828
"k8s.io/client-go/tools/cache"
2929
"k8s.io/client-go/util/workqueue"
30-
vcqueue "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
31-
3230
"paddleflow/pkg/apiserver/models"
3331
"paddleflow/pkg/common/k8s"
34-
"paddleflow/pkg/common/schema"
32+
queueschema "paddleflow/pkg/common/schema"
3533
)
3634

3735
const (
@@ -47,9 +45,10 @@ type QueueSyncInfo struct {
4745

4846
type QueueSync struct {
4947
sync.Mutex
50-
opt *k8s.DynamicClientOption
51-
jobQueue workqueue.RateLimitingInterface
52-
vcQueueInformer cache.SharedIndexInformer
48+
opt *k8s.DynamicClientOption
49+
jobQueue workqueue.RateLimitingInterface
50+
// informerMap contains GroupVersionKind and informer for queue, and ElasticResourceQuota
51+
informerMap map[schema.GroupVersionKind]cache.SharedIndexInformer
5352
}
5453

5554
func NewQueueSync() Controller {
@@ -64,29 +63,32 @@ func (qs *QueueSync) Initialize(opt *k8s.DynamicClientOption) error {
6463
log.Infof("Initialize %s controller!", qs.Name())
6564
qs.opt = opt
6665
qs.jobQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
67-
68-
vcQueueGVRMap, err := qs.opt.GetGVR(k8s.VCQueueGVK)
69-
if err != nil {
70-
log.Warnf("cann't find GroupVersionKind [%s]", k8s.VCQueueGVK)
71-
} else {
72-
qs.vcQueueInformer = qs.opt.DynamicFactory.ForResource(vcQueueGVRMap.Resource).Informer()
73-
qs.vcQueueInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
74-
UpdateFunc: qs.updateQueue,
75-
DeleteFunc: qs.deleteQueue,
76-
})
66+
qs.informerMap = make(map[schema.GroupVersionKind]cache.SharedIndexInformer)
67+
68+
for _, gvk := range k8s.GVKToQuotaType {
69+
gvrMap, err := qs.opt.GetGVR(gvk)
70+
if err != nil {
71+
log.Warnf("cann't find GroupVersionKind [%s]", gvk)
72+
} else {
73+
qs.informerMap[gvk] = qs.opt.DynamicFactory.ForResource(gvrMap.Resource).Informer()
74+
qs.informerMap[gvk].AddEventHandler(cache.ResourceEventHandlerFuncs{
75+
UpdateFunc: qs.updateQueue,
76+
DeleteFunc: qs.deleteQueue,
77+
})
78+
}
7779
}
7880
return nil
7981
}
8082

8183
func (qs *QueueSync) Run(stopCh <-chan struct{}) {
82-
if qs.vcQueueInformer == nil {
83-
log.Infof("Cluster hasn't vc queue, skip %s controller!", qs.Name())
84+
if len(qs.informerMap) == 0 {
85+
log.Infof("Cluster hasn't needed GroupVersionKind, skip %s controller!", qs.Name())
8486
return
8587
}
8688
go qs.opt.DynamicFactory.Start(stopCh)
8789

88-
if qs.vcQueueInformer != nil {
89-
if !cache.WaitForCacheSync(stopCh, qs.vcQueueInformer.HasSynced) {
90+
for _, informer := range qs.informerMap {
91+
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
9092
log.Errorf("timed out waiting for caches to %s", qs.Name())
9193
return
9294
}
@@ -124,51 +126,57 @@ func (qs *QueueSync) processWorkItem() bool {
124126

125127
// updateQueue for queue update event
126128
func (qs *QueueSync) updateQueue(oldObj, newObj interface{}) {
127-
oldQueue, err := unstructuredToVCQueue(oldObj)
128-
if err != nil {
129+
oldQueue := oldObj.(*unstructured.Unstructured)
130+
queue := newObj.(*unstructured.Unstructured)
131+
132+
oldSpec, err := getResourceSpec(oldQueue)
133+
if err != nil || oldSpec == nil {
134+
log.Errorf("get spec from old resource object %s failed", oldQueue.GetName())
129135
return
130136
}
131-
queue, err := unstructuredToVCQueue(newObj)
132-
if err != nil {
137+
spec, err := getResourceSpec(queue)
138+
if err != nil || spec == nil {
139+
log.Errorf("get spec from new resource object %s failed", queue.GetName())
133140
return
134141
}
135-
log.Debugf("vcQueueInformer update begin. oldQueue:%v newQueue:%v", oldQueue, queue)
136-
if reflect.DeepEqual(oldQueue.Spec, queue.Spec) && oldQueue.Status == queue.Status {
142+
143+
log.Debugf("%s queue resource is updated. old:%v new:%v", queue.GroupVersionKind(), oldQueue, queue)
144+
if reflect.DeepEqual(oldSpec, spec) {
137145
return
138146
}
139-
status := string(queue.Status.State)
140-
if !reflect.DeepEqual(oldQueue.Spec, queue.Spec) {
141-
status = schema.StatusQueueUnavailable
142-
}
147+
148+
log.Infof("watch %s resource is updated, name is %s", queue.GroupVersionKind(), queue.GetName())
149+
var status = queueschema.StatusQueueUnavailable
143150
queueInfo := &QueueSyncInfo{
144-
Name: queue.Name,
151+
Name: queue.GetName(),
145152
Status: status,
146-
Message: fmt.Sprintf("queue[%s] is update from cluster", queue.Name),
153+
Message: fmt.Sprintf("queue[%s] is update from cluster", queue.GetName()),
147154
}
148155
qs.jobQueue.Add(queueInfo)
149156
}
150157

151-
// deleteQueue for queue delete event
158+
// deleteQueue for queue resource delete event
152159
func (qs *QueueSync) deleteQueue(obj interface{}) {
153-
queue, err := unstructuredToVCQueue(obj)
154-
if err != nil {
155-
return
156-
}
157-
log.Debugf("vcQueueInformer DeleteFunc. queueName:%s", queue.Name)
160+
queueObj := obj.(*unstructured.Unstructured)
161+
log.Infof("watch %s resource is deleted, name is %s", queueObj.GroupVersionKind(), queueObj.GetName())
162+
158163
queueInfo := &QueueSyncInfo{
159-
Name: queue.Name,
160-
Status: schema.StatusQueueUnavailable,
161-
Message: fmt.Sprintf("queue[%s] is deleted from cluster", queue.Name),
164+
Name: queueObj.GetName(),
165+
Status: queueschema.StatusQueueUnavailable,
166+
Message: fmt.Sprintf("queue resource[%s] is deleted from cluster", queueObj.GetName()),
162167
}
163168
qs.jobQueue.Add(queueInfo)
164169
}
165170

166-
func unstructuredToVCQueue(obj interface{}) (*vcqueue.Queue, error) {
167-
queueObj := obj.(*unstructured.Unstructured)
168-
queue := &vcqueue.Queue{}
169-
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(queueObj.Object, queue); err != nil {
170-
log.Errorf("convert unstructured object[%+v] to sparkApp failed: %v", obj, err)
171-
return nil, err
171+
func getResourceSpec(queueObj *unstructured.Unstructured) (interface{}, error) {
172+
spec, ok, unerr := unstructured.NestedFieldCopy(queueObj.Object, "spec")
173+
if !ok {
174+
if unerr != nil {
175+
log.Error(unerr, "NestedFieldCopy unstructured to spec error")
176+
return nil, unerr
177+
}
178+
log.Info("NestedFieldCopy unstructured to spec error: Spec is not found in resource")
179+
return nil, fmt.Errorf("get spec from unstructured object failed")
172180
}
173-
return queue, nil
181+
return spec, nil
174182
}

pkg/job/runtime/kubernetes/controller/queue_sync_test.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controller
1818

1919
import (
2020
"context"
21+
"k8s.io/apimachinery/pkg/runtime/schema"
2122
"testing"
2223
"time"
2324

@@ -57,14 +58,25 @@ func newFakeQueueSyncController() *QueueSync {
5758

5859
func TestQueueSync(t *testing.T) {
5960
tests := []struct {
60-
name string
61-
oldObj *unstructured.Unstructured
62-
newObj *unstructured.Unstructured
61+
name string
62+
queueName string
63+
gvr schema.GroupVersionResource
64+
oldObj *unstructured.Unstructured
65+
newObj *unstructured.Unstructured
6366
}{
6467
{
65-
name: "queue create",
66-
oldObj: NewUnstructured(k8s.VCQueueGVK, "", "q1"),
67-
newObj: NewUnstructured(k8s.VCQueueGVK, "", "q1"),
68+
name: "volcano queue capability quota",
69+
queueName: "q1",
70+
gvr: VCQueueGVR,
71+
oldObj: NewUnstructured(k8s.VCQueueGVK, "", "q1"),
72+
newObj: NewUnstructured(k8s.VCQueueGVK, "", "q1"),
73+
},
74+
{
75+
name: "elastic resource quota",
76+
queueName: "elasticQuota1",
77+
gvr: EQuotaGVR,
78+
oldObj: NewUnstructured(k8s.EQuotaGVK, "", "elasticQuota1"),
79+
newObj: NewUnstructured(k8s.EQuotaGVK, "", "elasticQuota1"),
6880
},
6981
}
7082

@@ -73,12 +85,12 @@ func TestQueueSync(t *testing.T) {
7385
ctx := &logger.RequestContext{UserName: "test"}
7486
db_fake.InitFakeDB()
7587
err := models.CreateQueue(ctx, &models.Queue{
76-
Name: "q1",
88+
Name: test.queueName,
7789
})
7890
assert.Equal(t, nil, err)
7991

8092
c := newFakeQueueSyncController()
81-
_, err = c.opt.DynamicClient.Resource(VCQueueGVR).Create(context.TODO(), test.newObj, metav1.CreateOptions{})
93+
_, err = c.opt.DynamicClient.Resource(test.gvr).Create(context.TODO(), test.newObj, metav1.CreateOptions{})
8294
assert.Equal(t, nil, err)
8395
c.updateQueue(test.oldObj, test.newObj)
8496
c.deleteQueue(test.newObj)

0 commit comments

Comments
 (0)