Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 62 additions & 18 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ private SystemKeyspace()
public static final String REPAIRS = "repairs";
public static final String TOP_PARTITIONS = "top_partitions";
public static final String METADATA_LOG = "local_metadata_log";
public static final String SNAPSHOT_TABLE_NAME = "metadata_snapshots";
public static final String METADATA_SNAPSHOTS = "metadata_snapshots";
public static final String HOST_LOG_ID = "host_log_id";
public static final String SHARDS = "shards";
public static final String COORDINATOR_LOGS = "coordinator_logs";

/**
* By default the system keyspace tables should be stored in a single data directory to allow the server
Expand Down Expand Up @@ -223,14 +226,16 @@ private SystemKeyspace()
TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY, AVAILABLE_RANGES_V2, TRANSFERRED_RANGES_V2, VIEW_BUILDS_IN_PROGRESS,
BUILT_VIEWS, PREPARED_STATEMENTS, REPAIRS, TOP_PARTITIONS, LEGACY_PEERS, LEGACY_PEER_EVENTS,
LEGACY_TRANSFERRED_RANGES, LEGACY_AVAILABLE_RANGES, LEGACY_SIZE_ESTIMATES, LEGACY_SSTABLE_ACTIVITY,
METADATA_LOG, SNAPSHOT_TABLE_NAME, CONSENSUS_MIGRATION_STATE);
METADATA_LOG, METADATA_SNAPSHOTS, CONSENSUS_MIGRATION_STATE,
HOST_LOG_ID, SHARDS, COORDINATOR_LOGS);

public static final Set<String> TABLE_NAMES = ImmutableSet.of(
BATCHES, PAXOS, PAXOS_REPAIR_HISTORY, BUILT_INDEXES, LOCAL, PEERS_V2, PEER_EVENTS_V2,
COMPACTION_HISTORY, SSTABLE_ACTIVITY_V2, TABLE_ESTIMATES, AVAILABLE_RANGES_V2, TRANSFERRED_RANGES_V2, VIEW_BUILDS_IN_PROGRESS,
BUILT_VIEWS, PREPARED_STATEMENTS, REPAIRS, TOP_PARTITIONS, LEGACY_PEERS, LEGACY_PEER_EVENTS,
LEGACY_TRANSFERRED_RANGES, LEGACY_AVAILABLE_RANGES, LEGACY_SIZE_ESTIMATES, LEGACY_SSTABLE_ACTIVITY,
METADATA_LOG, SNAPSHOT_TABLE_NAME, CONSENSUS_MIGRATION_STATE);
METADATA_LOG, METADATA_SNAPSHOTS, CONSENSUS_MIGRATION_STATE,
HOST_LOG_ID, SHARDS, COORDINATOR_LOGS);

public static final TableMetadata Batches =
parse(BATCHES,
Expand Down Expand Up @@ -516,7 +521,7 @@ private SystemKeyspace()
+ "cfids set<uuid>, "
+ "PRIMARY KEY (parent_id))").build();

public static final TableMetadata LocalMetadataLog =
private static final TableMetadata LocalMetadataLog =
parse(METADATA_LOG,
"Local Metadata Log",
"CREATE TABLE %s ("
Expand All @@ -530,13 +535,49 @@ private SystemKeyspace()
"compaction_window_size","1")))
.build();

public static final TableMetadata Snapshots = parse(SNAPSHOT_TABLE_NAME,
"ClusterMetadata snapshots",
"CREATE TABLE IF NOT EXISTS %s (" +
"epoch bigint PRIMARY KEY," +
"snapshot blob)")
.partitioner(MetaStrategy.partitioner)
.build();
private static final TableMetadata Snapshots =
parse(METADATA_SNAPSHOTS,
"ClusterMetadata snapshots",
"CREATE TABLE IF NOT EXISTS %s (" +
"epoch bigint PRIMARY KEY," +
"snapshot blob)")
.partitioner(MetaStrategy.partitioner)
.build();

private static final TableMetadata HostLogId =
parse(HOST_LOG_ID,
"mutation tracking last used host log id",
"CREATE TABLE %s ("
+ "key text,"
+ "host_log_id int,"
+ "PRIMARY KEY ((key)))")
.build();

private static final TableMetadata Shards =
parse(SHARDS,
"mutation tracking shards",
"CREATE TABLE %s ("
+ "keyspace_name text,"
+ "range_start text,"
+ "range_end text,"
+ "participants frozen<set<int>>,"
+ "PRIMARY KEY ((keyspace_name, range_start, range_end)))")
.build();

private static final TableMetadata CoordinatorLogs =
parse(COORDINATOR_LOGS,
"mutation tracking coordinator logs",
"CREATE TABLE %s ("
+ "keyspace_name text,"
+ "range_start text,"
+ "range_end text,"
+ "host_id int,"
+ "host_log_id int,"
+ "participants frozen<set<int>>,"
+ "witnessed_offsets map<int, frozen<list<int>>>,"
+ "persisted_offsets map<int, frozen<list<int>>>,"
+ "PRIMARY KEY ((keyspace_name, range_start, range_end), host_id, host_log_id))")
.build();

@Deprecated(since = "4.0")
private static final TableMetadata LegacyPeers =
Expand Down Expand Up @@ -631,7 +672,10 @@ private static Tables tables()
TopPartitions,
LocalMetadataLog,
Snapshots,
ConsensusMigrationState);
ConsensusMigrationState,
HostLogId,
Shards,
CoordinatorLogs);
}

private static volatile Map<TableId, Pair<CommitLogPosition, Long>> truncationRecords;
Expand Down Expand Up @@ -2127,16 +2171,16 @@ public static void storeSnapshot(Epoch epoch, ByteBuffer snapshot)
{
Preconditions.checkArgument(epoch.isAfter(Epoch.FIRST), "Cannot store a snapshot for an epoch less than " + Epoch.FIRST.getEpoch());
logger.info("Storing snapshot of cluster metadata at epoch {}", epoch);
String query = String.format("INSERT INTO %s.%s (epoch, snapshot) VALUES (?, ?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
String query = String.format("INSERT INTO %s.%s (epoch, snapshot) VALUES (?, ?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
executeInternal(query, epoch.getEpoch(), snapshot);
forceBlockingFlush(SNAPSHOT_TABLE_NAME);
forceBlockingFlush(METADATA_SNAPSHOTS);
}

public static ByteBuffer getSnapshot(Epoch epoch)
{
Preconditions.checkArgument(epoch.isAfter(Epoch.FIRST), "Cannot retrieve a snapshot for an epoch less than " + Epoch.FIRST.getEpoch());
logger.info("Getting snapshot of epoch = {}", epoch);
String query = String.format("SELECT snapshot FROM %s.%s WHERE epoch = ?", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
String query = String.format("SELECT snapshot FROM %s.%s WHERE epoch = ?", SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
UntypedResultSet res = executeInternal(query, epoch.getEpoch());
if (res == null || res.isEmpty())
return null;
Expand Down Expand Up @@ -2165,7 +2209,7 @@ public static ByteBuffer findSnapshotBefore(Epoch search)
{
// during gossip upgrade we have epoch = Long.MIN_VALUE + 1 (and the reverse partitioner doesn't support negative keys)
search = search.isBefore(Epoch.EMPTY) ? Epoch.EMPTY : search;
String query = String.format("SELECT snapshot FROM %s.%s WHERE token(epoch) >= token(?) LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
String query = String.format("SELECT snapshot FROM %s.%s WHERE token(epoch) >= token(?) LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
UntypedResultSet res = executeInternal(query, search.getEpoch());
if (res != null && !res.isEmpty())
return res.one().getBytes("snapshot").duplicate();
Expand All @@ -2176,7 +2220,7 @@ public static List<Epoch> listSnapshotsSince(Epoch search)
{
// during gossip upgrade we have epoch = Long.MIN_VALUE + 1 (and the reverse partitioner doesn't support negative keys)
search = search.isBefore(Epoch.EMPTY) ? Epoch.EMPTY : search;
String query = String.format("SELECT epoch FROM %s.%s WHERE token(epoch) < token(?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
String query = String.format("SELECT epoch FROM %s.%s WHERE token(epoch) < token(?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
UntypedResultSet res = executeInternal(query, search.getEpoch());
if (res == null)
return Collections.emptyList();
Expand All @@ -2189,7 +2233,7 @@ public static List<Epoch> listSnapshotsSince(Epoch search)
*/
public static ByteBuffer findLastSnapshot()
{
String query = String.format("SELECT snapshot FROM %s.%s LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
String query = String.format("SELECT snapshot FROM %s.%s LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, METADATA_SNAPSHOTS);
UntypedResultSet res = executeInternal(query);
if (res != null && !res.isEmpty())
return res.one().getBytes("snapshot").duplicate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ protected Map<MetadataType, MetadataComponent> finalizeMetadata()
if (metadata().replicationType().isTracked() && repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
{
Preconditions.checkState(Objects.equals(pendingRepair, ActiveRepairService.NO_PENDING_REPAIR));
if (MutationTrackingService.instance.isDurablyReconciled(getKeyspaceName(), coordinatorLogOffsets))
if (MutationTrackingService.instance.isDurablyReconciled(coordinatorLogOffsets))
{
repairedAt = Clock.Global.currentTimeMillis();
logger.debug("Marking SSTable {} as reconciled with repairedAt {}", descriptor, repairedAt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ public class BroadcastLogOffsets
private final String keyspace;
private final Range<Token> range;
private final List<Offsets.Immutable> replicatedOffsets;
private final boolean durable;

public BroadcastLogOffsets(String keyspace, Range<Token> range, List<Offsets.Immutable> offsets)
public BroadcastLogOffsets(String keyspace, Range<Token> range, List<Offsets.Immutable> offsets, boolean durable)
{
this.keyspace = keyspace;
this.range = range;
this.replicatedOffsets = offsets;
this.durable = durable;
}

boolean isEmpty()
Expand All @@ -57,7 +59,7 @@ boolean isEmpty()
@Override
public String toString()
{
return "ShardReplicatedOffsets{" + keyspace + ", " + range + ", " + replicatedOffsets + '}';
return "ShardReplicatedOffsets{" + keyspace + ", " + range + ", " + replicatedOffsets + ", " + durable + '}';
}

public static final IVerbHandler<BroadcastLogOffsets> verbHandler = message -> {
Expand All @@ -66,6 +68,7 @@ public String toString()
MutationTrackingService.instance.updateReplicatedOffsets(replicatedOffsets.keyspace,
replicatedOffsets.range,
replicatedOffsets.replicatedOffsets,
replicatedOffsets.durable,
message.from());
};

Expand All @@ -79,6 +82,7 @@ public void serialize(BroadcastLogOffsets status, DataOutputPlus out, int versio
out.writeInt(status.replicatedOffsets.size());
for (Offsets.Immutable logOffsets : status.replicatedOffsets)
Offsets.serializer.serialize(logOffsets, out, version);
out.writeBoolean(status.durable);
}

@Override
Expand All @@ -90,7 +94,8 @@ public BroadcastLogOffsets deserialize(DataInputPlus in, int version) throws IOE
List<Offsets.Immutable> replicatedOffsets = new ArrayList<>(count);
for (int i = 0; i < count; ++i)
replicatedOffsets.add(Offsets.serializer.deserialize(in, version));
return new BroadcastLogOffsets(keyspace, range, replicatedOffsets);
boolean durable = in.readBoolean();
return new BroadcastLogOffsets(keyspace, range, replicatedOffsets, durable);
}

@Override
Expand All @@ -102,6 +107,7 @@ public long serializedSize(BroadcastLogOffsets replicatedOffsets, int version)
size += TypeSizes.sizeof(replicatedOffsets.replicatedOffsets.size());
for (Offsets.Immutable logOffsets : replicatedOffsets.replicatedOffsets)
size += Offsets.serializer.serializedSize(logOffsets, version);
size += TypeSizes.sizeof(replicatedOffsets.durable);
return size;
}
};
Expand Down
Loading