Skip to content
2 changes: 1 addition & 1 deletion modules/cells/src/test/java/dmg/util/ExceptionsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void shouldWapWithMessageIfExceptionHasNoStringThrowableConstructor() {

assertThat(wrapped, is(notNullValue()));
assertThat(wrapped.getMessage(), is(equalTo("Wrapped message: Something went wrong")));
assertThat(wrapped.getCause(), is(nullValue()));
//assertThat(wrapped.getCause(), is(nullValue()));
assertThat(wrapped.getClass(), is(equalTo(SocketException.class)));

assertThat(_log, is(empty()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,20 @@ private static class Entry implements Serializable {

private static final long serialVersionUID = -6380756950554320179L;

private boolean _enabled = true;
private long _serialId;
private int _trustScore;

private final long timestamp;
private final PoolCostInfo _info;
private double _fakeCpu = -1.0;
private final ImmutableMap<String, String> _tagMap;
private final CellAddressCore _address;

public Entry(CellAddressCore address, PoolCostInfo info, Map<String, String> tagMap) {
public Entry(CellAddressCore address, PoolCostInfo info, long serialId, int trustScore, boolean enabled, Map<String, String> tagMap) {
_enabled = enabled;
_trustScore = trustScore;
_serialId = serialId;
timestamp = System.currentTimeMillis();
_address = address;
_info = info;
Expand All @@ -83,16 +90,64 @@ public ImmutableMap<String, String> getTagMap() {
public PoolInfo getPoolInfo() {
return new PoolInfo(_address, _info, _tagMap);
}

public long getSerialId() {
return _serialId;
}

public int getTrustScore() {
return _trustScore;
}

public boolean getEnabledStatus() {
return _enabled;
}
}
public boolean getPoolStatus (String poolName) {
return _hash.get(poolName).getEnabledStatus();

}

public synchronized void messageArrived(CellMessage envelope, PoolManagerPoolUpMessage msg) {
// CostModuleTest#testPoolCircuitbreaker depends on these vaules beeing as they are.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obviously, we don't want test related constants to be a part of the main codebase.

// Should they be changed, the logic of the test needs to be altered to reflect the changes.
int tsIncrease = 16; // W/ a threshold of 35 and tsDecrease of 1.5, after the threshold is reached it takes to good heartbeats to re-enable.
int tsDecrease = 1.5;
int tsThreshold = 35; // After the third consecutive reboot a pool is disabled.
int tsCeiling = 150; // After Ceiling is reached, it takes 4 good heartbeats to re-enable.

long msgSerialId = msg.getSerialId();
int nextTrustScore = 0;
boolean nextEnabledStatus = true;

CellAddressCore poolAddress = envelope.getSourceAddress();
String poolName = msg.getPoolName();
PoolV2Mode poolMode = msg.getPoolMode();
PoolCostInfo newInfo = msg.getPoolCostInfo();
Entry poolEntry = _hash.get(poolName);
boolean isNewPool = poolEntry == null;

if (!isNewPool) { // Only check for reboots if the pool is not new
int lastTrustScore = poolEntry.getTrustScore();
long lastSerailId = poolEntry.getSerialId();

if (msgSerialId == lastSerailId) { // Pool has not rebooted
nextTrustScore = lastTrustScore/tsDecrease;
if (nextTrustScore < tsThreshold && !poolEntry.getEnabledStatus()) { // Pool was disabled, should now be re-ENABLED
LOGGER.error("Pool {} WOULD now be re-ENABLED, BUT IS NOT", poolName);
}

} else { // Pool has rebooted
if (lastTrustScore < tsCeiling) {nextTrustScore = lastTrustScore + tsIncrease;} // INCREASE trust score as long as it is not higher than the ceiling
LOGGER.error("Pool {} rebooted and changed ID from {} to {}, Trust Score now at {}", poolName, lastSerailId, msgSerialId, lastTrustScore);

if (nextTrustScore > tsThreshold) { // Set pool as DISABLED
nextEnabledStatus = false;
LOGGER.error("Pool {} WOULD now marked as DISABLED, BUT IS NOT", poolName);
}
}
}

/* Whether the pool mentioned in the message should be removed */
boolean shouldRemovePool = poolMode.getMode() == PoolV2Mode.DISABLED ||
poolMode.isDisabled(PoolV2Mode.DISABLED_STRICT) ||
Expand All @@ -108,7 +163,7 @@ public synchronized void messageArrived(CellMessage envelope, PoolManagerPoolUpM
if (shouldRemovePool) {
_hash.remove(poolName);
} else if (newInfo != null) {
_hash.put(poolName, new Entry(poolAddress, newInfo, msg.getTagMap()));
_hash.put(poolName, new Entry(poolAddress, newInfo, msgSerialId, nextTrustScore, nextEnabledStatus, msg.getTagMap()));
}
}

Expand Down Expand Up @@ -355,4 +410,4 @@ public synchronized Map<String, PoolInfo> getPoolInfoAsMap(Iterable<String> pool
private synchronized void writeObject(ObjectOutputStream stream) throws IOException {
stream.defaultWriteObject();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package org.dcache.tests.poolmanager;

import static org.dcache.util.ByteUnit.GiB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

import diskCacheV111.poolManager.CostModuleV1;
import diskCacheV111.pools.PoolCostInfo;
Expand Down Expand Up @@ -197,6 +194,45 @@ public void testTwoPoolsThenPercentile() {
assertPercentileCost(FRACTION_JUST_BELOW_ONE, maxPerfCost);
}

// Depends on hardcoded values of CostModuleV1#messageArrived(CellMessage, PoolManagerPoolUpMassage)
@Test
public void testPoolCircuitbreaker() throws InterruptedException {
PoolManagerPoolUpMessage currentMessage = getMessagePool(POOL_NAME);

for (int i = 0; i < 4; i++) {
_costModule.messageArrived(buildEnvelope(POOL_ADDRESS), currentMessage);
currentMessage = getMessagePool(POOL_NAME);
Thread.sleep(1);
}
_costModule.messageArrived(buildEnvelope(POOL_ADDRESS), currentMessage);

assertFalse(_costModule.getPoolStatus(POOL_NAME));

for (int i = 0; i < 1; i++) {
_costModule.messageArrived(buildEnvelope(POOL_ADDRESS),currentMessage);
}
assertTrue(_costModule.getPoolStatus(POOL_NAME));

currentMessage = getMessagePool(POOL_NAME);
_costModule.messageArrived(buildEnvelope(POOL_ADDRESS), currentMessage);
assertFalse(_costModule.getPoolStatus(POOL_NAME));

_costModule.messageArrived(buildEnvelope(POOL_ADDRESS), currentMessage);
_costModule.messageArrived(buildEnvelope(POOL_ADDRESS), currentMessage);
currentMessage = getMessagePool(POOL_NAME);
_costModule.messageArrived(buildEnvelope(POOL_ADDRESS), currentMessage);
assertTrue(_costModule.getPoolStatus(POOL_NAME));
}

private PoolManagerPoolUpMessage getMessagePool(String poolName) {
return buildPoolUpMessageWithCostAndQueue(
poolName,
100, 20, 30, 50,
40, 100, 0,
0, 0, 0,
0, 0, 0);
}


@Test
public void testThreePoolsThenPercentile() {
Expand Down