From 4af985f898a2894b3e06219a0419a6516583dd2c Mon Sep 17 00:00:00 2001 From: zhaolong Date: Tue, 28 May 2019 14:56:57 +0200 Subject: [PATCH] read brokers and partition size json from file instead of Zookeeper 1) stream all partitions metrics to DD is too expensive, read from file 2) zookeeper only allows 1MB size for ZNODE, too small to store all our partition infos --- cmd/topicmappr/commands/metadata.go | 51 ++++++++++++++++++++++++++++- cmd/topicmappr/commands/rebuild.go | 9 +++-- cmd/topicmappr/main.go | 2 +- 3 files changed, 58 insertions(+), 4 deletions(-) diff --git a/cmd/topicmappr/commands/metadata.go b/cmd/topicmappr/commands/metadata.go index 499a2b3a..0f7f9c86 100644 --- a/cmd/topicmappr/commands/metadata.go +++ b/cmd/topicmappr/commands/metadata.go @@ -4,6 +4,8 @@ import ( "fmt" "os" "time" + "io/ioutil" + "encoding/json" "github.com/DataDog/kafka-kit/kafkazk" @@ -42,6 +44,35 @@ func getBrokerMeta(cmd *cobra.Command, zk kafkazk.Handler, m bool) kafkazk.Broke } os.Exit(1) } + // Get a broker map of the brokers in the current partition map. + // If meta data isn't being looked up, brokerMeta will be empty. + bmif, _ := cmd.Flags().GetString("brokers-storage-in-file") + if bmif != "" { + jsonFile, err := os.Open(bmif) + // if we os.Open returns an error then handle it + if err != nil { + fmt.Printf("Error on %s",err) + os.Exit(1) + } + // defer the closing of our jsonFile so that we can parse it later on + defer jsonFile.Close() + data, _ := ioutil.ReadAll(jsonFile) + bmm := kafkazk.BrokerMetricsMap{} + err = json.Unmarshal(data, &bmm) + if err != nil { + fmt.Errorf("Error unmarshalling broker metrics: %s", err.Error()) + os.Exit(1) + } + // Populate each broker with + // metric data. + for bid := range brokerMeta { + m, exists := bmm[bid] + if exists { + brokerMeta[bid].StorageFree = m.StorageFree + brokerMeta[bid].MetricsIncomplete = false + } + } + } return brokerMeta } @@ -70,6 +101,24 @@ func getPartitionMeta(cmd *cobra.Command, zk kafkazk.Handler) kafkazk.PartitionM fmt.Println(err) os.Exit(1) } - + // Get a the partitionMetaMap from input file + psif, _ := cmd.Flags().GetString("partitions-size-in-file") + if psif != "" { + jsonFile, err := os.Open(psif) + // if we os.Open returns an error then handle it + if err != nil { + fmt.Printf("Error on %s", err) + os.Exit(1) + } + // defer the closing of our jsonFile so that we can parse it later on + defer jsonFile.Close() + data, _ := ioutil.ReadAll(jsonFile) + err = json.Unmarshal(data, &partitionMeta) + if err != nil { + fmt.Errorf("Error unmarshalling broker metrics: %s", err.Error()) + os.Exit(1) + } + } + //fmt.Println(partitionMeta) return partitionMeta } diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index e2cada1f..9987a146 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -29,6 +29,8 @@ func init() { rebuildCmd.Flags().Bool("use-meta", true, "Use broker metadata in placement constraints") rebuildCmd.Flags().String("out-path", "", "Path to write output map files to") rebuildCmd.Flags().String("out-file", "", "If defined, write a combined map of all topics to a file") + rebuildCmd.Flags().String("partitions-size-in-file", "", "Read Topics partitions sizes from a file") + rebuildCmd.Flags().String("brokers-storage-in-file", "", "Read Brokers free storage from a file") rebuildCmd.Flags().Bool("force-rebuild", false, "Forces a complete map rebuild") rebuildCmd.Flags().Int("replication", 0, "Normalize the topic replication factor across all replica sets (0 results in a no-op)") rebuildCmd.Flags().Bool("sub-affinity", false, "Replacement broker substitution affinity") @@ -54,7 +56,8 @@ func rebuild(cmd *cobra.Command, _ []string) { fr, _ := cmd.Flags().GetBool("force-rebuild") sa, _ := cmd.Flags().GetBool("sub-affinity") m, _ := cmd.Flags().GetBool("use-meta") - + bsif, _ := cmd.Flags().GetString("brokers-storage-in-file") + psif, _ := cmd.Flags().GetString("partitions-size-in-file") switch { case ms == "" && t == "": fmt.Println("\n[ERROR] must specify either --topics or --map-string") @@ -103,7 +106,9 @@ func rebuild(cmd *cobra.Command, _ []string) { // Fetch broker metadata. var withMetrics bool if cmd.Flag("placement").Value.String() == "storage" { - checkMetaAge(cmd, zk) + if bsif == "" || psif == "" { + checkMetaAge(cmd, zk) + } withMetrics = true } diff --git a/cmd/topicmappr/main.go b/cmd/topicmappr/main.go index c28d56e6..f8da961e 100644 --- a/cmd/topicmappr/main.go +++ b/cmd/topicmappr/main.go @@ -1,6 +1,6 @@ package main -import "github.com/DataDog/kafka-kit/cmd/topicmappr/commands" +import "kafka-kit/cmd/topicmappr/commands" func main() { commands.Execute()