Skip to content

Commit 9997c2c

Browse files
Merge pull request #12 from CommonWorkflowScheduler/memoryPredictionChanges
Memory prediction changes
2 parents d38c179 + 71f7aab commit 9997c2c

File tree

71 files changed

+3227
-138
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+3227
-138
lines changed

README.md

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,20 @@ docker push <your docker account>/cws:<version>
1313

1414
#### API Endpoints
1515

16-
| # | Resource | Method |
17-
| -- | :---------------------------------- | :----: |
18-
| 1 | /{version}/{execution} | POST |
19-
| 2 | /{version}/{execution} | DELETE |
20-
| 3 | /{version}/{execution}/DAG/vertices | POST |
21-
| 4 | /{version}/{execution}/DAG/vertices | DELETE |
22-
| 5 | /{version}/{execution}/DAG/edges | POST |
23-
| 6 | /{version}/{execution}/DAG/edges | DELETE |
24-
| 7 | /{version}/{execution}/startBatch | PUT |
25-
| 8 | /{version}/{execution}/endBatch | PUT |
26-
| 9 | /{version}/{execution}/task/{id} | POST |
27-
| 10 | /{version}/{execution}/task/{id} | GET |
28-
| 11 | /{version}/{execution}/task/{id} | DELETE |
16+
| # | Resource | Method |
17+
|----|:---------------------------------------------------|:------:|
18+
| 1 | /{version}/scheduler/{execution} | POST |
19+
| 2 | /{version}/scheduler/{execution} | DELETE |
20+
| 3 | /{version}/scheduler/{execution}/DAG/vertices | POST |
21+
| 4 | /{version}/scheduler/{execution}/DAG/vertices | DELETE |
22+
| 5 | /{version}/scheduler/{execution}/DAG/edges | POST |
23+
| 6 | /{version}/scheduler/{execution}/DAG/edges | DELETE |
24+
| 7 | /{version}/scheduler/{execution}/startBatch | PUT |
25+
| 8 | /{version}/scheduler/{execution}/endBatch | PUT |
26+
| 9 | /{version}/scheduler/{execution}/task/{id} | POST |
27+
| 10 | /{version}/scheduler/{execution}/task/{id} | GET |
28+
| 11 | /{version}/scheduler/{execution}/task/{id} | DELETE |
29+
| 12 | /{version}/scheduler/{execution}/metrics/task/{id} | POST |
2930

3031
SWAGGER: /swagger-ui.html <br>
3132
API-DOCS: /v3/api-docs
@@ -122,6 +123,78 @@ spec:
122123
claimName: api-exp-data
123124
```
124125

126+
#### Profiles
127+
This is a Spring Boot application, that can be run with profiles. The "default" profile is used if no configuration is set. The "dev" profile can be enabled by setting the JVM System Parameter
128+
129+
-Dspring.profiles.active=dev
130+
or Environment Variable
131+
132+
export spring_profiles_active=dev
133+
or via the corresponding setting in your development environment or within the pod definition.
134+
135+
Example:
136+
137+
$ SCHEDULER_NAME=workflow-scheduler java -Dspring.profiles.active=dev -jar cws-k8s-scheduler-1.2.jar
138+
139+
The "dev" profile is useful for debugging and reporting problems because it increases the log-level.
140+
141+
---
142+
#### Memory Prediction and Task Scaling
143+
- Kubernetes Feature InPlacePodVerticalScaling must be enabled. This is available starting from Kubernetes v1.27. See [KEP 1287](https://github.com/kubernetes/enhancements/issues/1287) for the current status.
144+
- Supported if used together with [nf-cws](https://github.com/CommonWorkflowScheduler/nf-cws) version 1.0.5 or newer.
145+
146+
The memory predictor that shall be used for task scaling is set via the configuration. If not set, task scaling is disabled.
147+
The memory predictor is provided as a string following the pattern "&lt;memory predictor&gt;-[&lt;additional&gt;=&lt;parameter&gt;]", e.g., "linear-offset=std".
148+
The following strategies are available:
149+
150+
| Memory Predictor | Behaviour |
151+
|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
152+
| linear/lr | The Linear predictor, will try to predict a memory usage that is linear to the task input size. |
153+
| linear2/lr2 | The Linear predictor with an unequal loss function. The loss penalizes underprediction more than overprediction. |
154+
| mean | The Mean predictor predicts the mean memory seen so far. Prediction is independent of the input size. |
155+
| ponder | The Ponder predictor is an advanced memory prediction strategy that ponders between linear regression with unequal loss and historic values. Details are provided in our paper [tbd](). |
156+
| constX | Predicts a constant value (X), if no X is given, it predicts 0. |
157+
| polyX | Prediction will be based on the Xth polynomial function based on a task's input size. If no X is provided, it uses X=2. |
158+
159+
160+
The offset uses the current prediction model and based on that it predicts the memory for all finished tasks.
161+
Then, it calculates the difference between the observed memory and the predicted memory.
162+
163+
| Offset | Behaviour |
164+
|-------------|---------------------------------------------------------------------------------------------------------------|
165+
| none | No additional offset will be applied. |
166+
| "" | If no offset is defined, the max offset will be used. |
167+
| max | The max offset returns the largest underprediction. |
168+
| Xpercentile | X is an integer between 1 and 100, over all prediction differences, it will use the Xth percentile as offset. |
169+
| var | This offset applies the variance as an offset. |
170+
| Xstd | This offset applies X times the standard deviation as an offset. If no X is provided, it uses X=1. |
171+
172+
#### Scheduling strategies
173+
174+
The scheduling strategy can be set via the configuration.
175+
The scheduling strategy is provided as a string following the pattern "&lt;scheduling strategy&gt;[-&lt;node assignment strategy&gt;]".
176+
The following strategies are available:
177+
178+
| Scheduling Strategy | Behaviour |
179+
|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|
180+
| fifo | Tasks that have been submitted earlier, will be scheduled earlier. |
181+
| rank | Tasks will be prioritized based on their rank in the DAG. |
182+
| rank_min | Rank (min) Same as rank but solves ties such that tasks with smaller input size are preferred. |
183+
| rank_max | Rank (max) Same as rank but solves ties such that tasks with larger input size are preferred. |
184+
| lff_min | Least finished first (min): prioritizes abstract tasks where less instances have finished, solves ties with rank_min |
185+
| lff_max | Least finished first (max): prioritizes abstract tasks where less instances have finished, solves ties with rank_max |
186+
| gs_min | Generate Samples (min) Hybrid of LFF (min) and Rank (max), prioritize abstract tasks with less than five finished instances. Afterwards, use Rank (max). |
187+
| gs_max | Generate Samples (max) Hybrid of LFF (max) and Rank (max), prioritize abstract tasks with less than five finished instances. Afterwards, use Rank (max). |
188+
| random | Randomly prioritize tasks. |
189+
| max | Prioritize tasks with larger input size. |
190+
| min | Prioritize tasks with smaller input size. |
191+
192+
| Node Assignment Strategy | Behaviour |
193+
|--------------------------|-----------------------------------------------------------------------------------------|
194+
| random | Randomly distributes the tasks to nodes. |
195+
| roundrobin | (default) Assigns tasks in a round robin fashion to the nodes. |
196+
| fair | Distributes the tasks fairly to the nodes trying to achieve equal load on all machines. |
197+
125198
---
126199

127200
If you use this software or artifacts in a publication, please cite it as:

pom.xml

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,7 @@
4545
<dependency>
4646
<groupId>io.fabric8</groupId>
4747
<artifactId>kubernetes-client</artifactId>
48-
<version>6.9.0</version>
49-
</dependency>
50-
51-
<dependency>
52-
<groupId>org.javatuples</groupId>
53-
<artifactId>javatuples</artifactId>
54-
<version>1.2</version>
48+
<version>6.13.1</version>
5549
</dependency>
5650

5751
<dependency>
@@ -75,12 +69,14 @@
7569
<dependency>
7670
<groupId>org.springframework.boot</groupId>
7771
<artifactId>spring-boot-starter-web</artifactId>
72+
<version>3.3.1</version>
7873
</dependency>
7974

8075
<dependency>
8176
<groupId>org.springframework.boot</groupId>
8277
<artifactId>spring-boot-starter-test</artifactId>
8378
<scope>test</scope>
79+
<version>3.3.1</version>
8480
</dependency>
8581

8682
<dependency>
@@ -106,7 +102,7 @@
106102
<dependency>
107103
<groupId>commons-net</groupId>
108104
<artifactId>commons-net</artifactId>
109-
<version>3.8.0</version>
105+
<version>3.10.0</version>
110106
</dependency>
111107

112108
<dependency>
@@ -118,6 +114,7 @@
118114
<dependency>
119115
<groupId>ch.qos.logback</groupId>
120116
<artifactId>logback-core</artifactId>
117+
<version>1.5.3</version>
121118
</dependency>
122119
<dependency>
123120
<groupId>com.fasterxml.jackson.core</groupId>
@@ -134,6 +131,12 @@
134131
<artifactId>jackson-annotations</artifactId>
135132
</dependency>
136133

134+
<dependency>
135+
<groupId>org.apache.commons</groupId>
136+
<artifactId>commons-math3</artifactId>
137+
<version>3.6.1</version>
138+
</dependency>
139+
137140
</dependencies>
138141

139142
<build>

src/main/java/cws/k8s/scheduler/Main.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package cws.k8s.scheduler;
22

3+
import jakarta.annotation.PostConstruct;
34
import lombok.extern.slf4j.Slf4j;
45
import org.springframework.beans.factory.annotation.Autowired;
56
import org.springframework.boot.SpringApplication;
67
import org.springframework.boot.autoconfigure.SpringBootApplication;
78
import org.springframework.boot.info.BuildProperties;
89

9-
import jakarta.annotation.PostConstruct;
1010
import java.text.DateFormat;
1111
import java.text.SimpleDateFormat;
1212
import java.util.Date;

src/main/java/cws/k8s/scheduler/client/CWSKubernetesClient.java

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
import cws.k8s.scheduler.model.NodeWithAlloc;
44
import cws.k8s.scheduler.model.PodWithAge;
5+
import cws.k8s.scheduler.model.Task;
56
import io.fabric8.kubernetes.api.model.*;
7+
import io.fabric8.kubernetes.client.KubernetesClientException;
8+
import io.fabric8.kubernetes.client.Watcher;
9+
import io.fabric8.kubernetes.client.WatcherException;
610
import io.fabric8.kubernetes.client.*;
711
import io.fabric8.kubernetes.client.Config;
812
import io.fabric8.kubernetes.client.dsl.MixedOperation;
@@ -12,6 +16,7 @@
1216
import lombok.extern.slf4j.Slf4j;
1317

1418
import java.math.BigDecimal;
19+
import java.math.RoundingMode;
1520
import java.util.*;
1621

1722
@Slf4j
@@ -22,7 +27,6 @@ public class CWSKubernetesClient {
2227
private final Map<String, NodeWithAlloc> nodeHolder = new HashMap<>();
2328
private final List<Informable> informables = new LinkedList<>();
2429

25-
2630
public CWSKubernetesClient() {
2731
KubernetesClientBuilder builder = new KubernetesClientBuilder();
2832
this.client = builder.build();
@@ -146,6 +150,13 @@ public void eventReceived(Action action, Node node) {
146150
boolean change = false;
147151
NodeWithAlloc processedNode = null;
148152
switch (action) {
153+
case MODIFIED:
154+
final NodeWithAlloc nodeWithAlloc = kubernetesClient.nodeHolder.get( node.getMetadata().getName() );
155+
if ( nodeWithAlloc != null ){
156+
nodeWithAlloc.update( node );
157+
break;
158+
}
159+
// If the node is not in the nodeHolder, it is a new node
149160
case ADDED:
150161
log.info("New Node {} was added", node.getMetadata().getName());
151162
synchronized ( kubernetesClient.nodeHolder ){
@@ -175,10 +186,6 @@ public void eventReceived(Action action, Node node) {
175186
log.info("Node {} has an error", node.getMetadata().getName());
176187
//todo deal with error
177188
break;
178-
case MODIFIED:
179-
log.info("Node {} was modified", node.getMetadata().getName());
180-
//todo deal with changed state
181-
break;
182189
default: log.warn("No implementation for {}", action);
183190
}
184191
}
@@ -205,7 +212,7 @@ public void eventReceived(Action action, Pod pod) {
205212
switch ( action ){
206213
case ADDED:
207214
if ( !PodWithAge.hasFinishedOrFailed( pod ) ) {
208-
node.addPod(new PodWithAge(pod), false);
215+
node.addPod(new PodWithAge(pod));
209216
}
210217
break;
211218
case MODIFIED:
@@ -236,4 +243,81 @@ public void onClose(WatcherException cause) {
236243

237244
}
238245

246+
public boolean inPlacePodVerticalScalingActive() {
247+
return featureGateActive("InPlacePodVerticalScaling");
248+
}
249+
250+
public boolean featureGateActive( String featureGate ){
251+
return pods()
252+
.inNamespace( "kube-system" )
253+
.list()
254+
.getItems()
255+
.stream()
256+
.filter( p -> p.getMetadata().getName().startsWith( "kube-apiserver" ) )
257+
.anyMatch( p -> p
258+
.getSpec()
259+
.getContainers()
260+
.stream()
261+
.anyMatch( c -> c
262+
.getCommand()
263+
.contains( "--feature-gates=" + featureGate + "=true" )
264+
)
265+
);
266+
}
267+
268+
/**
269+
* It will create a patch for the memory limits and request values and submit it
270+
* to the cluster.
271+
* Moreover, it updates the task with the new pod.
272+
*
273+
* @param t the task to be patched
274+
* @return false if patching failed because of InPlacePodVerticalScaling
275+
*/
276+
public boolean patchTaskMemory( Task t ) {
277+
try {
278+
final String valueAsString = t.getPlanedRequirements().getRam()
279+
.divide( BigDecimal.valueOf( 1024L * 1024L ) )
280+
.setScale( 0, RoundingMode.CEILING ).toPlainString() + "Mi";
281+
final PodWithAge pod = t.getPod();
282+
String namespace = pod.getMetadata().getNamespace();
283+
String podname = pod.getName();
284+
Resource<Pod> podResource = pods()
285+
.inNamespace( namespace )
286+
.withName( podname );
287+
Container container = podResource.get().getSpec().getContainers().get(0); // Assuming only one container
288+
Container modifiedContainer = new ContainerBuilder(container)
289+
.editOrNewResources()
290+
.removeFromLimits( "memory" )
291+
.removeFromRequests( "memory" )
292+
.addToLimits("memory", new Quantity(valueAsString))
293+
.addToRequests("memory", new Quantity(valueAsString))
294+
.endResources()
295+
.build();
296+
297+
Pod modifiedPod = new PodBuilder( podResource.get() )
298+
.editOrNewSpec()
299+
.removeFromContainers( container )
300+
.addToContainers(modifiedContainer)
301+
.endSpec()
302+
.editOrNewMetadata()
303+
.addToLabels( "commonworkflowscheduler/memoryscaled", "true" )
304+
.endMetadata()
305+
.build();
306+
307+
t.setPod( new PodWithAge( modifiedPod ) );
308+
309+
podResource.patch(modifiedPod);
310+
311+
} catch ( KubernetesClientException e ) {
312+
// this typically happens when the feature gate InPlacePodVerticalScaling was not enabled
313+
if (e.toString().contains("Forbidden: pod updates may not change fields other than")) {
314+
log.error("Could not patch task. Please make sure that the feature gate 'InPlacePodVerticalScaling' is enabled in Kubernetes. See https://github.com/kubernetes/enhancements/issues/1287 for details. Task scaling will now be disabled for the rest of this workflow execution.");
315+
} else {
316+
log.error("Could not patch task: {}", t.getConfig().getName(), e);
317+
}
318+
throw new CannotPatchException( e.getMessage() );
319+
}
320+
return true;
321+
}
322+
239323
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package cws.k8s.scheduler.client;
2+
3+
public class CannotPatchException extends RuntimeException {
4+
5+
public CannotPatchException(String message) {
6+
super(message);
7+
}
8+
9+
}

src/main/java/cws/k8s/scheduler/dag/Process.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,33 @@
44
import java.util.HashSet;
55
import java.util.LinkedList;
66
import java.util.Set;
7+
import java.util.concurrent.atomic.AtomicInteger;
78

89
public class Process extends Vertex {
910

1011

1112
private final Set<Process> descendants;
1213
private final Set<Process> ancestors;
1314

15+
private AtomicInteger successfullyFinished = new AtomicInteger(0);
16+
private AtomicInteger failed = new AtomicInteger(0);
17+
18+
public int getSuccessfullyFinished() {
19+
return successfullyFinished.get();
20+
}
21+
22+
public int getFailed() {
23+
return failed.get();
24+
}
25+
26+
public void incrementSuccessfullyFinished() {
27+
successfullyFinished.incrementAndGet();
28+
}
29+
30+
public void incrementFailed() {
31+
failed.incrementAndGet();
32+
}
33+
1434
void addDescendant( Process p ) {
1535
synchronized (descendants) {
1636
descendants.add( p );

0 commit comments

Comments
 (0)