Skip to content

Commit 317762a

Browse files
committed
Review feedback /1
1 parent eda0caa commit 317762a

File tree

4 files changed

+32
-18
lines changed

4 files changed

+32
-18
lines changed

src/java/org/apache/cassandra/replication/CoordinatorLog.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

34+
import com.google.common.annotations.VisibleForTesting;
3435
import com.google.common.base.Preconditions;
3536

3637
import org.agrona.collections.Int2ObjectHashMap;
@@ -387,7 +388,7 @@ private long nextSequenceId()
387388
int prevTimestamp = MutationId.timestamp(prev);
388389

389390
// int overflow
390-
if (prevOffset == Integer.MAX_VALUE)
391+
if (prevOffset == MAX_OFFSET)
391392
return -1;
392393

393394
int nextOffset = prevOffset + 1;
@@ -517,4 +518,13 @@ void deleteFromSystemTable()
517518
{
518519
executeInternal(DELETE_QUERY, keyspace, range.left.toString(), range.right.toString(), logId.hostId, logId.hostLogId);
519520
}
521+
522+
@VisibleForTesting
523+
static void overrideMaxOffsetForTesting(int nexMaxOffset)
524+
{
525+
MAX_OFFSET = nexMaxOffset;
526+
}
527+
// don't make volatile unless it genuinely is an issue for some test,
528+
// otherwise it should be *fine* as is, and slight overkill to make volatile
529+
private static int MAX_OFFSET = Integer.MAX_VALUE;
520530
}

src/java/org/apache/cassandra/replication/MutationTrackingService.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -306,10 +306,7 @@ public boolean isDurablyReconciled(ImmutableCoordinatorLogOffsets logOffsets)
306306
{
307307
Shard shard = log2ShardMap.get(new CoordinatorLogId(logId));
308308
if (shard == null)
309-
{
310-
logger.debug("Could not find shard for logId {}", logId);
311-
return false;
312-
}
309+
throw new IllegalStateException("Could not find shard for logId " + logId);
313310

314311
if (!shard.isDurablyReconciled(logId, logOffsets))
315312
return false;

src/java/org/apache/cassandra/replication/Node2OffsetsMap.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@
3333
*/
3434
public class Node2OffsetsMap
3535
{
36-
private final Int2ObjectHashMap<Offsets.Mutable> offsetstMap;
36+
private final Int2ObjectHashMap<Offsets.Mutable> offsetsMap;
3737

3838
public Node2OffsetsMap()
3939
{
40-
offsetstMap = new Int2ObjectHashMap<>(8, 0.65f, false);
40+
offsetsMap = new Int2ObjectHashMap<>(8, 0.65f, false);
4141
}
4242

4343
public static Node2OffsetsMap forParticipants(CoordinatorLogId logId, Participants participants)
@@ -50,21 +50,21 @@ public static Node2OffsetsMap forParticipants(CoordinatorLogId logId, Participan
5050

5151
void set(int node, Offsets.Mutable offsets)
5252
{
53-
offsetstMap.put(node, offsets);
53+
offsetsMap.put(node, offsets);
5454
}
5555

5656
@Nonnull
5757
Offsets.Mutable get(int node)
5858
{
59-
return Preconditions.checkNotNull(offsetstMap.get(node));
59+
return Preconditions.checkNotNull(offsetsMap.get(node));
6060
}
6161

6262
Offsets.Mutable intersection()
6363
{
64-
Iterator<Offsets.Mutable> iter = offsetstMap.values().iterator();
64+
Iterator<Offsets.Mutable> iter = offsetsMap.values().iterator();
6565

6666
Preconditions.checkArgument(iter.hasNext());
67-
if (offsetstMap.size() == 1)
67+
if (offsetsMap.size() == 1)
6868
return Offsets.Mutable.copy(iter.next());
6969

7070
Offsets.Mutable intersection = iter.next();
@@ -75,31 +75,31 @@ Offsets.Mutable intersection()
7575

7676
public void add(int node, Offsets offsets)
7777
{
78-
Offsets.Mutable current = offsetstMap.get(node);
78+
Offsets.Mutable current = offsetsMap.get(node);
7979
if (current != null)
8080
current.addAll(offsets);
8181
else
82-
offsetstMap.put(node, Offsets.Mutable.copy(offsets));
82+
offsetsMap.put(node, Offsets.Mutable.copy(offsets));
8383
}
8484

8585
public void forEach(IntObjConsumer<Offsets.Mutable> consumer)
8686
{
87-
offsetstMap.forEachInt(consumer);
87+
offsetsMap.forEachInt(consumer);
8888
}
8989

9090
public void clear()
9191
{
92-
offsetstMap.clear();
92+
offsetsMap.clear();
9393
}
9494

9595
public int size()
9696
{
97-
return offsetstMap.size();
97+
return offsetsMap.size();
9898
}
9999

100100
void convertToPrimitiveMap(Map<Integer, List<Integer>> into)
101101
{
102-
for (Int2ObjectHashMap<Offsets.Mutable>.EntryIterator iter = offsetstMap.entrySet().iterator(); iter.hasNext();)
102+
for (Int2ObjectHashMap<Offsets.Mutable>.EntryIterator iter = offsetsMap.entrySet().iterator(); iter.hasNext();)
103103
{
104104
iter.next();
105105
into.put(iter.getIntKey(), iter.getValue().asList());
@@ -120,6 +120,6 @@ public boolean equals(Object o)
120120
if (!(o instanceof Node2OffsetsMap))
121121
return false;
122122
Node2OffsetsMap that = (Node2OffsetsMap) o;
123-
return this.offsetstMap.equals(that.offsetstMap);
123+
return this.offsetsMap.equals(that.offsetsMap);
124124
}
125125
}

src/java/org/apache/cassandra/replication/Shard.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828

2929
import com.google.common.base.Preconditions;
3030

31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
3134
import org.agrona.collections.IntArrayList;
3235
import org.apache.cassandra.cql3.UntypedResultSet;
3336
import org.apache.cassandra.db.Mutation;
@@ -49,6 +52,8 @@
4952

5053
public class Shard
5154
{
55+
private static final Logger logger = LoggerFactory.getLogger(Shard.class);
56+
5257
final int localNodeId;
5358
final String keyspace;
5459
final Range<Token> range;
@@ -103,7 +108,9 @@ synchronized private MutationId maybeRotateLocalLogAndGetNextId()
103108
MutationId nextId = currentLocalLog.nextId();
104109
if (nextId != null) // another thread got to rotate before us
105110
return nextId;
111+
CoordinatorLogId oldLogId = currentLocalLog.logId;
106112
currentLocalLog = createNewPrimayLog();
113+
logger.info("Rotated primary log for {}/{} from {} to {}", keyspace, range, oldLogId, currentLocalLog.logId);
107114
return nextId();
108115
}
109116

0 commit comments

Comments
 (0)