diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java b/src/java/org/apache/cassandra/schema/DistributedSchema.java index 0eceb9b169bf..e7045cd9b83e 100644 --- a/src/java/org/apache/cassandra/schema/DistributedSchema.java +++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; import org.apache.cassandra.auth.AuthKeyspace; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.functions.UserFunction; @@ -34,6 +35,7 @@ import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import java.io.IOException; import java.util.ArrayList; @@ -151,14 +153,25 @@ public boolean hasAccordKeyspaces() return keyspaces.stream().anyMatch(ksm -> ksm.tables.stream().anyMatch(TableMetadata::requiresAccordSupport)); } + /** + * @deprecated since TCM, used on upgrade from gossip to populate system schema tables with the correct generation + */ + @Deprecated(since = "TCM") + public static List> distributedKeyspacesWithGeneration(Set knownDatacenters) + { + return ImmutableList.of(Pair.create(DistributedMetadataLogKeyspace.initialMetadata(knownDatacenters), DistributedMetadataLogKeyspace.GENERATION), + Pair.create(TraceKeyspace.metadata(), TraceKeyspace.GENERATION), + Pair.create(SystemDistributedKeyspace.metadata(), SystemDistributedKeyspace.GENERATION), + Pair.create(AuthKeyspace.metadata(),AuthKeyspace.GENERATION)); + } + public static DistributedSchema fromSystemTables(Keyspaces keyspaces, Set knownDatacenters) { if (!keyspaces.containsKeyspace(SchemaConstants.METADATA_KEYSPACE_NAME)) { - Keyspaces kss = Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(knownDatacenters), - TraceKeyspace.metadata(), - SystemDistributedKeyspace.metadata(), - AuthKeyspace.metadata()); + Keyspaces kss = Keyspaces.none(); + for (Pair ksmGen : distributedKeyspacesWithGeneration(knownDatacenters)) + kss = kss.with(ksmGen.left); for (KeyspaceMetadata ksm : keyspaces) // on disk keyspaces kss = kss.withAddedOrUpdated(kss.get(ksm.name) .map(k -> merged(ksm, k)) diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index bb660b984500..fe1033e42965 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -18,8 +18,10 @@ package org.apache.cassandra.tcm; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -37,6 +39,7 @@ import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.dht.BootStrapper; @@ -49,8 +52,11 @@ import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.SchemaKeyspace; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.log.LocalLog; @@ -69,6 +75,7 @@ import org.apache.cassandra.tcm.transformations.UnsafeJoin; import org.apache.cassandra.tcm.transformations.cms.Initialize; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL; import static org.apache.cassandra.tcm.compatibility.GossipHelper.emptyWithSchemaFromSystemTables; @@ -255,12 +262,26 @@ public static void initializeForDiscovery(Runnable initMessaging) Election.instance.migrated(); } + private static void updateSystemSchemaTables(Set knownDatacenters) + { + List> kss = DistributedSchema.distributedKeyspacesWithGeneration(knownDatacenters); + List mutations = new ArrayList<>(); + for (Pair ksm : kss) + { + Keyspaces.KeyspacesDiff ksDiff = Keyspaces.diff(Keyspaces.none(), Keyspaces.of(ksm.left)); + mutations.addAll(SchemaKeyspace.convertSchemaDiffToMutations(ksDiff, ksm.right)); + } + SchemaKeyspace.applyChanges(mutations); + } + /** * This should only be called during startup. */ public static void initializeFromGossip(Function wrapProcessor, Runnable initMessaging) throws StartupException { - ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(SystemKeyspace.allKnownDatacenters()); + Set knownDcs = SystemKeyspace.allKnownDatacenters(); + updateSystemSchemaTables(knownDcs); + ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(knownDcs); LocalLog.LogSpec logSpec = LocalLog.logSpec() .withInitialState(emptyFromSystemTables) .afterReplay(Startup::scrubDataDirectories, diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java index eb717602f36b..c7e33b822837 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java @@ -35,6 +35,7 @@ import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeId; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.psjava.util.AssertStatus.assertTrue; @@ -54,6 +55,11 @@ public void simpleUpgradeTest() throws Throwable cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); }) .runAfterClusterUpgrade((cluster) -> { + Object [][] r = cluster.get(1).executeInternal("select keyspace_name from system_schema.keyspaces where keyspace_name='system_cluster_metadata'"); + assertEquals(1, r.length); + r = cluster.get(1).executeInternal("select table_name from system_schema.tables where keyspace_name='system_cluster_metadata' and table_name='distributed_metadata_log'"); + assertEquals(1, r.length); + cluster.get(1).nodetoolResult("cms","initialize").asserts().success(); cluster.forEach(i -> {