diff --git a/.travis.yml b/.travis.yml index b165b91..982e21b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,13 @@ env: - GO111MODULE=on gobuild_args: -a -tags netgo -ldflags '-w' go_import_path: github.com/adevinta/vulcan-tracker -script: - - go install ./... - - go test -short -v $(go list ./... | grep -v /vendor/) + +jobs: + include: + # "test" stage. + - stage: "test" + name: "test" + script: ./_script/test -cover ./... + # script: + # - go install ./... + # - go test -short -v $(go list ./... | grep -v /vendor/) diff --git a/README.md b/README.md index d761c65..d9dec35 100644 --- a/README.md +++ b/README.md @@ -33,3 +33,21 @@ For running the component locally, clone and run at the root of the repo the fol ```sh go install ./... ``` + +## Test + +Execute the tests: + +``` +_script/test -cover ./... +``` + +`_script/test` makes sure the testing infrastructure is up and running and then +runs `go test` with the provided arguments. It also disables test caching and +avoids running multiple test programs in parallel. + +Stop the testing infrastructure: + +``` +_script/clean +``` \ No newline at end of file diff --git a/_resources/config/local.toml b/_resources/config/local.toml index 1ece285..3d8c860 100644 --- a/_resources/config/local.toml +++ b/_resources/config/local.toml @@ -6,6 +6,14 @@ port = 8080 [log] level = "DEBUG" +[kafka] +user = "user" +pass = "supersecret" +broker = "localhost:9092" +# Time between retries on failure (0 means no retries). +retry_duration="5s" +topics = {assets = "test-topic"} + [servers] [servers.example1] url = "http://localhost:8080" @@ -20,3 +28,5 @@ level = "DEBUG" project = "TEST" fix_workflow = ["ToDo", "In Progress", "Under Review", "Fixed"] wontfix_workflow = ["Won't Fix"] + + diff --git a/_script/clean b/_script/clean new file mode 100755 index 0000000..501b1ee --- /dev/null +++ b/_script/clean @@ -0,0 +1,8 @@ +#!/bin/bash + +set -e -u + +# Set working directory to /_script. +cd "$(dirname $0)" + +docker-compose -p 'vulcan-tracker' rm -s -f diff --git a/_script/docker-compose.yml b/_script/docker-compose.yml new file mode 100644 index 0000000..45ae2c8 --- /dev/null +++ b/_script/docker-compose.yml @@ -0,0 +1,49 @@ +version: "2.4" + +services: + setup: + image: busybox + depends_on: + # The service_healthy condition is not honored by "docker-compose run" in + # all docker-compose versions, but it is respected if the condition is in + # a transitive dependency. Thus, we have created an intermediate + # dependency with all the required conditions. + - healthchecks + entrypoint: ["echo", "setup done"] + + healthchecks: + image: busybox + depends_on: + kafka: + condition: service_healthy + entrypoint: ["echo", "healthchecks done"] + + kafka: + image: confluentinc/cp-kafka:7.2.2 + ports: + - 127.0.0.1:9092:9092 + expose: + - 29092 + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://127.0.0.1:9092 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + healthcheck: + test: ["CMD", "nc", "-z", "kafka", "9092"] + interval: 5s + timeout: 10s + retries: 6 + depends_on: + zookeeper: + condition: service_started + + zookeeper: + image: confluentinc/cp-zookeeper:7.2.2 + expose: + - 2181 + environment: + - ZOOKEEPER_CLIENT_PORT=2181 + - ZOOKEEPER_TICK_TIME=2000 diff --git a/_script/setup b/_script/setup new file mode 100755 index 0000000..85c39d9 --- /dev/null +++ b/_script/setup @@ -0,0 +1,8 @@ +#!/bin/bash + +set -e -u + +# Set working directory to /_script. +cd "$(dirname $0)" + +docker-compose -p 'vulcan-tracker' run setup diff --git a/_script/test b/_script/test new file mode 100755 index 0000000..18462e3 --- /dev/null +++ b/_script/test @@ -0,0 +1,14 @@ +#!/bin/bash + +set -e -u + +# Set working directory to the root of the repo. +cd "$(dirname $0)/.." + +# Start testing infrastructure (i.e. kafka and graph-asset-inventory-api). +./_script/setup + +# Disable the test cache (-count=1), so tests always connect to the testing +# infrastructure. Also, do not run multiple test programs in parallel (-p=1), +# so there are no race conditions between tests in different packages. +exec go test -count=1 -p=1 "$@" diff --git a/cmd/vulcan-tracker/main.go b/cmd/vulcan-tracker/main.go index 3e79427..562c6d4 100644 --- a/cmd/vulcan-tracker/main.go +++ b/cmd/vulcan-tracker/main.go @@ -1,19 +1,24 @@ /* Copyright 2022 Adevinta */ - package main import ( + "context" "flag" "fmt" - "log" "strings" + "time" "github.com/adevinta/vulcan-tracker/pkg/api" "github.com/adevinta/vulcan-tracker/pkg/config" + "github.com/adevinta/vulcan-tracker/pkg/events" + "github.com/adevinta/vulcan-tracker/pkg/events/kafka" + "github.com/adevinta/vulcan-tracker/pkg/log" + "github.com/adevinta/vulcan-tracker/pkg/model" "github.com/adevinta/vulcan-tracker/pkg/storage" "github.com/adevinta/vulcan-tracker/pkg/tracking" + "github.com/adevinta/vulcan-tracker/pkg/vulcan" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" ) @@ -27,6 +32,10 @@ func main() { log.Fatalf("Error reading configuration: %v", err) } + if err := log.SetLevel(strings.ToLower(cfg.Log.Level)); err != nil { + log.Fatalf("error setting log level: %v", err) + } + e := echo.New() e.Logger.SetLevel(config.ParseLogLvl(cfg.Log.Level)) @@ -68,7 +77,135 @@ func main() { e.POST("/:team_id/tickets/:id/fix", a.FixTicket) e.POST("/:team_id/tickets/:id/wontfix", a.WontFixTicket) + errs := make(chan error, 1) + + go func() { + if err := runEventReader(context.Background(), cfg.Kafka, storage, trackerServers); err != nil { + errs <- fmt.Errorf("vulcan-tracker: %v", err) + } + // close(errs) + }() + if err := <-errs; err != nil { + // handle error + log.Error.Fatal(err) + } + address := fmt.Sprintf(":%d", cfg.API.Port) e.Logger.Fatal(e.Start(address)) } + +// runEventReader is invoked by main and does the actual work of reading events. +func runEventReader(ctx context.Context, kCfg config.KafkaConfig, s storage.Storage, ts map[string]tracking.TicketTracker) error { + + kcfg := map[string]any{ + "bootstrap.servers": kCfg.KafkaBroker, + "group.id": kCfg.KafkaGroupID, + "auto.offset.reset": "earliest", + "enable.partition.eof": true, + } + + if kCfg.KafkaUsername != "" && kCfg.KafkaPassword != "" { + kcfg["security.protocol"] = "sasl_ssl" + kcfg["sasl.mechanisms"] = "SCRAM-SHA-256" + kcfg["sasl.username"] = kCfg.KafkaUsername + kcfg["sasl.password"] = kCfg.KafkaPassword + } + + proc, err := kafka.NewProcessor(kcfg) + if err != nil { + return fmt.Errorf("error creating kafka processor: %w", err) + } + defer proc.Close() + + vcli := vulcan.NewClient(proc) + for { + log.Info.Println("vulcan-tracker: processing findings") + + select { + case <-ctx.Done(): + log.Info.Println("vulcan-tracker: context is done") + return nil + default: + } + + if err := vcli.ProcessFindings(ctx, findingHandler(s, ts)); err != nil { + err = fmt.Errorf("error processing findings: %w", err) + if kCfg.RetryDuration == 0 { + return err + } + log.Error.Printf("vulcan-tracker: %v", err) + } + + log.Info.Printf("vulcan-tracker: retrying in %v", kCfg.RetryDuration) + time.Sleep(kCfg.RetryDuration) + } + +} + +// findingHandler processes finding events coming from a stream. +func findingHandler(s storage.Storage, ts map[string]tracking.TicketTracker) vulcan.FindingHandler { + return func(payload events.FindingNotification, isNil bool) error { + log.Debug.Printf("vulcan-tracker: payload=%#v isNil=%v", payload, isNil) + for _, teamId := range payload.Target.Teams { + pConfig, err := s.ProjectConfig(teamId) + if err != nil { + // The team doesn't have a configuration in vulcan-tracker. We'll skip its findings. + continue + } + if !pConfig.AutoCreate { + // This team doesn't have enable the auto create configuration. + continue + } + err = upsertTicket(pConfig, ts[pConfig.ServerName], payload) + if err != nil { + return fmt.Errorf("could not upsert finding: %w", err) + } + } + return nil + } +} + +func upsertTicket(pConfig *model.ProjectConfig, tt tracking.TicketTracker, payload events.FindingNotification) error { + // Check if there is a ticket for this finding. + ticket, err := tt.FindTicketByFindingID(pConfig.Project, pConfig.VulnerabilityIssueType, payload.ID) + if err != nil { + return fmt.Errorf("error searching the finding %s: %w", payload.ID, err) + } + + // Create the ticket. + if ticket == nil { + // TODO: Define a good summary and description. + ticket = &model.Ticket{ + Project: pConfig.Project, + TicketType: pConfig.VulnerabilityIssueType, + Summary: payload.Issue.Summary, + Description: fmt.Sprintf("FindingID: %s", payload.ID), + } + ticket, err := tt.CreateTicket(ticket) + if err != nil { + return fmt.Errorf("could not create a ticket: %w", err) + } + log.Debug.Printf("vulcan-tracker: created ticket %s ", ticket.Key) + + return nil + + } else { + // Update the ticket. + switch status := payload.Status; status { + + case "FIXED": + _, err := tt.FixTicket(ticket.Key, pConfig.FixedWorkflow) + if err != nil { + return err + } + case "FALSE POSITIVE": + _, err := tt.WontFixTicket(ticket.Key, pConfig.WontFixWorkflow, "Won't Do") + if err != nil { + return err + } + } + } + + return nil +} diff --git a/config.toml b/config.toml index dea4d0a..df95645 100644 --- a/config.toml +++ b/config.toml @@ -12,3 +12,9 @@ user = "$JIRA_USER" token = "$JIRA_TOKEN" vulnerability_issue_type = "$JIRA_VULNERABILITY_ISSUE_TYPE" project = "$JIRA_PROJECT" + +[kafka] +user = "user" +pass = "supersecret" +broker = "localhost:9092" +topic = "findings-v0" \ No newline at end of file diff --git a/go.mod b/go.mod index bf55953..cbe036e 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( require ( github.com/BurntSushi/toml v1.2.0 github.com/andygrunwald/go-jira v1.16.0 + github.com/confluentinc/confluent-kafka-go v1.9.2 github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/google/go-cmp v0.5.9 github.com/labstack/gommon v0.3.1 diff --git a/go.sum b/go.sum index 0e4f626..ac58178 100644 --- a/go.sum +++ b/go.sum @@ -1,38 +1,141 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0= github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA= +github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ= +github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc= github.com/andygrunwald/go-jira v1.16.0 h1:PU7C7Fkk5L96JvPc6vDVIrd99vdPnYudHu4ju2c2ikQ= github.com/andygrunwald/go-jira v1.16.0/go.mod h1:UQH4IBVxIYWbgagc0LF/k9FRs9xjIiQ8hIcC6HfLwFU= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/confluentinc/confluent-kafka-go v1.9.2 h1:gV/GxhMBUb03tFWkN+7kdhg+zf+QUM+wVkI9zwh770Q= +github.com/confluentinc/confluent-kafka-go v1.9.2/go.mod h1:ptXNqsuDfYbAE/LBW6pnwWZElUoWxHoV8E43DCrliyo= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/frankban/quicktest v1.2.2/go.mod h1:Qh/WofXFeiAFII1aEBu529AtJo6Zg2VHscnEsbBnJ20= +github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= +github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y= +github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs= github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.2.1-0.20190312032427-6f77996f0c42/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/hamba/avro v1.5.6/go.mod h1:3vNT0RLXXpFm2Tb/5KC71ZRJlOroggq1Rcitb6k4Fr8= +github.com/heetch/avro v0.3.1/go.mod h1:4xn38Oz/+hiEUTpbVfGVLfvOg0yKLlRP7Q9+gJJILgA= +github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA= +github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= +github.com/invopop/jsonschema v0.4.0/go.mod h1:O9uiLokuu0+MGFlyiaqtWxwqJm41/+8Nj0lD7A36YH0= +github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= +github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= +github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ= +github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E= +github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI= +github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/juju/qthttptest v0.1.1/go.mod h1:aTlAv8TYaflIiTDIQYzxnl1QdPjAg8Q8qJMErpKy6A4= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo/v4 v4.9.0 h1:wPOF1CE6gvt/kmbMR4dGzWvHMPT+sAEUJOwOTtvITVY= github.com/labstack/echo/v4 v4.9.0/go.mod h1:xkCDAdFCIf8jsFQ5NnbK7oqaF/yU1A1X20Ltm0OvSks= github.com/labstack/gommon v0.3.1 h1:OomWaJXm7xR6L1HmEtGyQf26TEn7V6X88mktX9kee9o= github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM= +github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= +github.com/linkedin/goavro/v2 v2.10.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/linkedin/goavro/v2 v2.10.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/linkedin/goavro/v2 v2.11.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/mattn/go-colorable v0.1.11 h1:nQ+aFkoE2TMGc0b68U2OKSexC+eq46+XwZzWXHRmPYs= github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/nrwiersma/avro-benchmarks v0.0.0-20210913175520-21aec48c8f76/go.mod h1:iKyFMidsk/sVYONJRE372sJuX/QTRPacU7imPqqsu7g= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= +github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.3.1-0.20190311161405-34c6fa2dc709/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -45,25 +148,122 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.2.1 h1:TVEnxayobAdVkhQfrfes2IzOB6o+z4roRkPF52WA1u4= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 h1:HWj/xjIHfjYU5nVXpTM0s39J9CbLn7Cc5a7IC5rwsMQ= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200505041828-1ed23360d12c/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f h1:rlezHXNlxYWvBCzNses9Dlc7nGFaNMJeqLolcmQSSZY= golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 h1:Hir2P/De0WpUhtrKGGjvSb2YxUgyZ7EFOSLIcSSpiwE= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200505023115-26f46d2f7ef8/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183/go.mod h1:FvqrFXt+jCsyQibeRv4xxEJBL5iG2DDW5aeJwzDiq4A= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v1 v1.0.0/go.mod h1:CxwszS/Xz1C49Ucd2i6Zil5UToP1EmyrFhKaMVbg1mk= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/httprequest.v1 v1.2.1/go.mod h1:x2Otw96yda5+8+6ZeWwHIJTFkEHWP/qP8pJOzqEtWPM= +gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= +gopkg.in/retry.v1 v1.0.3/go.mod h1:FJkXmWiMaAo7xB+xhvDF59zhfjDWyzmyAxiT4dB688g= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/api/api.go b/pkg/api/api.go index 26e51f2..23c2c74 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -1,7 +1,6 @@ /* Copyright 2022 Adevinta */ - package api import ( diff --git a/pkg/config/config.go b/pkg/config/config.go index 1ba7593..8a03b84 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,13 +1,14 @@ /* Copyright 2022 Adevinta */ - package config import ( + "fmt" "io" "os" "strconv" + "time" "github.com/BurntSushi/toml" "github.com/labstack/gommon/log" @@ -26,12 +27,23 @@ type Team struct { VulnerabilityIssueType string `toml:"vulnerability_issue_type"` FixWorkflow []string `toml:"fix_workflow"` WontFixWorkflow []string `toml:"wontfix_workflow"` + AutoCreate bool `toml:"auto_create"` // create tickets automatically from a stream + +} + +type KafkaConfig struct { + RetryDuration time.Duration `toml:"retry_duration"` + KafkaBroker string `toml:"broker"` + KafkaGroupID string `toml:"group_id"` + KafkaUsername string `toml:"username"` + KafkaPassword string `toml:"password"` } type Config struct { API apiConfig `toml:"api"` Servers map[string]Server `toml:"servers"` Teams map[string]Team `toml:"teams"` + Kafka KafkaConfig `toml:"kafka"` Log logConfig `toml:"log"` } @@ -63,11 +75,34 @@ func ParseConfig(cfgFilePath string) (*Config, error) { } if envVar := os.Getenv("PORT"); envVar != "" { - entVarInt, err := strconv.Atoi(envVar) + envVarInt, err := strconv.Atoi(envVar) if err != nil { return nil, err } - conf.API.Port = entVarInt + conf.API.Port = envVarInt + } + + if envVar := os.Getenv("KAFKA_BROKER"); envVar != "" { + conf.Kafka.KafkaBroker = envVar + } + + if envVar := os.Getenv("RETRY_DURATION"); envVar != "" { + conf.Kafka.RetryDuration, err = time.ParseDuration(envVar) + if err != nil { + return nil, fmt.Errorf("invalid retry duration: %w", err) + } + } + + if envVar := os.Getenv("KAFKA_GROUP_ID"); envVar != "" { + conf.Kafka.KafkaGroupID = envVar + } + + if envVar := os.Getenv("KAFKA_USERNAME"); envVar != "" { + conf.Kafka.KafkaUsername = envVar + } + + if envVar := os.Getenv("KAFKA_PASSWORD"); envVar != "" { + conf.Kafka.KafkaPassword = envVar } return &conf, nil diff --git a/pkg/events/kafka/client.go b/pkg/events/kafka/client.go new file mode 100644 index 0000000..77d04c2 --- /dev/null +++ b/pkg/events/kafka/client.go @@ -0,0 +1,105 @@ +/* +Copyright 2022 Adevinta +*/ +package kafka + +import ( + "context" + "fmt" + "time" + + "github.com/adevinta/vulcan-tracker/pkg/events" + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +// A Processor allows to process messages from a kafka topic. +type Processor struct { + c *kafka.Consumer +} + +// NewProcessor returns a [Processor] with the provided kafka +// configuration properties. +func NewProcessor(config map[string]any) (Processor, error) { + kconfig := make(kafka.ConfigMap) + for k, v := range config { + if err := kconfig.SetKey(k, v); err != nil { + return Processor{}, fmt.Errorf("could not set config key: %w", err) + } + } + kconfig["enable.auto.commit"] = false + kconfig["enable.auto.offset.store"] = false + + c, err := kafka.NewConsumer(&kconfig) + if err != nil { + return Processor{}, fmt.Errorf("failed to create a consumer: %w", err) + } + + return Processor{c}, nil +} + +// Process processes the messages received in the topic called entity by +// calling h. This method blocks the calling goroutine until the specified +// context is cancelled or an error occurs. It replaces the current kafka +// subscription, so it should not be called concurrently. +func (proc Processor) Process(ctx context.Context, entity string, h events.MsgHandler) error { + if err := proc.c.Subscribe(entity, nil); err != nil { + return fmt.Errorf("failed to subscribe to topic %w", err) + } + + for { + select { + case <-ctx.Done(): + return nil + default: + } + + kmsg, err := proc.c.ReadMessage(100 * time.Millisecond) + if err != nil { + kerr, ok := err.(kafka.Error) + if ok && kerr.Code() == kafka.ErrTimedOut { + continue + } + return fmt.Errorf("error reading message: %w", kerr) + } + + msg := events.Message{ + Key: kmsg.Key, + Value: kmsg.Value, + } + + for _, hdr := range kmsg.Headers { + entry := events.MetadataEntry{ + Key: []byte(hdr.Key), + Value: hdr.Value, + } + msg.Headers = append(msg.Headers, entry) + } + + if err := h(msg); err != nil { + return fmt.Errorf("error processing message: %w", err) + } + + if err := proc.Commit(*kmsg); err != nil { + return fmt.Errorf("error commiting message: %w", err) + } + } +} + +// Close closes the underlaying kafka consumer. +func (proc Processor) Close() error { + return proc.c.Close() +} + +func (proc Processor) Commit(kmsg kafka.Message) error { + _, err := proc.c.CommitOffsets([]kafka.TopicPartition{ + { + Topic: kmsg.TopicPartition.Topic, + Partition: kmsg.TopicPartition.Partition, + Offset: kmsg.TopicPartition.Offset + 1, + }, + }) + if err != nil { + return err + } + return nil +} diff --git a/pkg/events/kafka/client_test.go b/pkg/events/kafka/client_test.go new file mode 100644 index 0000000..719f8e4 --- /dev/null +++ b/pkg/events/kafka/client_test.go @@ -0,0 +1,212 @@ +/* +Copyright 2022 Adevinta +*/ +package kafka + +import ( + "context" + "errors" + "fmt" + "math/rand" + "strconv" + "testing" + "time" + + "github.com/adevinta/vulcan-tracker/pkg/events" + "github.com/adevinta/vulcan-tracker/pkg/events/streamtest" + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/google/go-cmp/cmp" +) + +const ( + bootstrapServers = "127.0.0.1:9092" + groupPrefix = "stream_kafka_kafka_test_group_" + topicPrefix = "stream_kafka_kafka_test_topic_" + messagesFile = "testdata/messages.json" + timeout = 5 * time.Minute +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +func setupKafka(topic string) (msgs []events.Message, err error) { + cfg := &kafka.ConfigMap{ + "bootstrap.servers": bootstrapServers, + + // Set message timeout to 5s, so the kafka client returns an + // error if the broker is not up. + "message.timeout.ms": 5000, + } + + prod, err := kafka.NewProducer(cfg) + if err != nil { + return nil, fmt.Errorf("error creating producer: %w", err) + } + defer prod.Close() + + msgs = streamtest.MustParse(messagesFile) + for _, msg := range msgs { + if err := produceMessage(prod, topic, msg); err != nil { + return nil, fmt.Errorf("error producing message: %w", err) + } + } + + return msgs, nil +} + +func produceMessage(prod *kafka.Producer, topic string, msg events.Message) error { + events := make(chan kafka.Event) + defer close(events) + + kmsg := &kafka.Message{ + Key: msg.Key, + Value: msg.Value, + TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, + } + + for _, e := range msg.Headers { + hdr := kafka.Header{ + Key: string(e.Key), + Value: e.Value, + } + kmsg.Headers = append(kmsg.Headers, hdr) + } + + if err := prod.Produce(kmsg, events); err != nil { + return fmt.Errorf("failed to produce message: %w", err) + } + + e := <-events + kmsg, ok := e.(*kafka.Message) + if !ok { + return errors.New("event type is not *events.Message") + } + if kmsg.TopicPartition.Error != nil { + return fmt.Errorf("could not deliver message: %w", kmsg.TopicPartition.Error) + } + + return nil +} + +func TestProcessorProcess(t *testing.T) { + topic := topicPrefix + strconv.FormatInt(rand.Int63(), 16) + + want, err := setupKafka(topic) + if err != nil { + t.Fatalf("error setting up kafka: %v", err) + } + + cfg := map[string]any{ + "bootstrap.servers": bootstrapServers, + "group.id": groupPrefix + strconv.FormatInt(rand.Int63(), 16), + "auto.commit.interval.ms": 100, + "auto.offset.reset": "earliest", + } + + proc, err := NewProcessor(cfg) + if err != nil { + t.Fatalf("error creating kafka processor: %v", err) + } + defer proc.Close() + + var ( + ctr int + got []events.Message + ) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + err = proc.Process(ctx, topic, func(msg events.Message) error { + got = append(got, msg) + + ctr++ + if ctr >= len(want) { + cancel() + } + + return nil + }) + if err != nil { + t.Fatalf("error processing messages: %v", err) + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("messages mismatch (-want +got):\n%v", diff) + } +} + +func TestProcessorProcessAtLeastOnce(t *testing.T) { + // Number of messages to process before error. + const n = 2 + + topic := topicPrefix + strconv.FormatInt(rand.Int63(), 16) + + want, err := setupKafka(topic) + if err != nil { + t.Fatalf("error setting up kafka: %v", err) + } + + if n > len(want) { + t.Fatal("n > testdata length") + } + + cfg := map[string]any{ + "bootstrap.servers": bootstrapServers, + "group.id": groupPrefix + strconv.FormatInt(rand.Int63(), 16), + "auto.offset.reset": "earliest", + } + + proc, err := NewProcessor(cfg) + if err != nil { + t.Fatalf("error creating kafka processor: %v", err) + } + defer proc.Close() + + var ( + ctr int + got []events.Message + ) + + // Fail after processing n messages. + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + err = proc.Process(ctx, topic, func(msg events.Message) error { + if ctr >= n { + return errors.New("error") + } + + got = append(got, msg) + ctr++ + + return nil + }) + if err == nil { + t.Fatalf("Process should have returned error: %v", err) + } + + // Wait for 1s to ensure that the offsets are commited. + time.Sleep(1 * time.Second) + + // Resume stream processing. + ctx, cancel = context.WithTimeout(context.Background(), timeout) + defer cancel() + + err = proc.Process(ctx, topic, func(msg events.Message) error { + got = append(got, msg) + + ctr++ + if ctr >= len(want) { + cancel() + } + + return nil + }) + if err != nil { + t.Fatalf("error processing messages: %v", err) + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("messages mismatch (-want +got):\n%v", diff) + } +} diff --git a/pkg/events/kafka/testdata/messages.json b/pkg/events/kafka/testdata/messages.json new file mode 100644 index 0000000..9831658 --- /dev/null +++ b/pkg/events/kafka/testdata/messages.json @@ -0,0 +1,161 @@ +[ + { + "key": "FindingID-1", + "value": { + "id": "FindingID-1", + "affected_resource": "AffectedResource-1", + "score": 9, + "status": "OPEN", + "details": "Details-1", + "impact_details": "ImpactDetails-1", + "issue": { + "id": "IssueID-1", + "summary": "Summary-1", + "cwe_id": 1, + "description": "Description-1", + "recommendations": ["Recommendation-1", "Recommendation-2"], + "reference_links": ["ReferenceLink-1", "ReferenceLink-2"], + "labels": ["Label-1", "Label-2"] + }, + "target": { + "id": "TargetID-1", + "identifier": "Identifier-1", + "teams": ["Team-1", "Team-2"] + }, + "source": { + "id": "SourceID-1", + "instance": "SourceInstance-1", + "options": "SourceOptions-1", + "time": "0001-01-01T00:00:00Z", + "name": "SourceName-1", + "component": "SourceComponent-1" + }, + "resources": [ + { + "name": "ResourceName-1", + "attributes": ["Attr-1", "Attr-2"], + "resources": [ + { + "Attr-1": "1", + "Attr-2": "2" + } + ] + } + ], + "total_exposure": 10, + "current_exposure": 5 + }, + "metadata": [ + { + "key": "version", + "value": "0.0.1" + } + ] + }, + { + "key": "FindingID-2", + "value": { + "id": "FindingID-2", + "affected_resource": "AffectedResource-1", + "score": 9, + "status": "OPEN", + "details": "Details-1", + "impact_details": "ImpactDetails-1", + "issue": { + "id": "IssueID-2", + "summary": "Summary-1", + "cwe_id": 1, + "description": "Description-1", + "recommendations": ["Recommendation-1", "Recommendation-2"], + "reference_links": ["ReferenceLink-1", "ReferenceLink-2"], + "labels": ["Label-1", "Label-2"] + }, + "target": { + "id": "TargetID-1", + "identifier": "Identifier-1", + "teams": ["Team-1", "Team-2"] + }, + "source": { + "id": "SourceID-1", + "instance": "SourceInstance-1", + "options": "SourceOptions-1", + "time": "0001-01-01T00:00:00Z", + "name": "SourceName-1", + "component": "SourceComponent-1" + }, + "resources": [ + { + "name": "ResourceName-1", + "attributes": ["Attr-1", "Attr-2"], + "resources": [ + { + "Attr-1": "1", + "Attr-2": "2" + } + ] + } + ], + "total_exposure": 10, + "current_exposure": 5 + }, + "headers": [ + { + "key": "version", + "value": "0.0.1" + } + ] + }, + { + "key": "FindingID-3", + "value": { + "id": "FindingID-3", + "affected_resource": "AffectedResource-1", + "score": 9, + "status": "OPEN", + "details": "Details-1", + "impact_details": "ImpactDetails-1", + "issue": { + "id": "IssueID-3", + "summary": "Summary-1", + "cwe_id": 1, + "description": "Description-1", + "recommendations": ["Recommendation-1", "Recommendation-2"], + "reference_links": ["ReferenceLink-1", "ReferenceLink-2"], + "labels": ["Label-1", "Label-2"] + }, + "target": { + "id": "TargetID-1", + "identifier": "Identifier-1", + "teams": ["Team-1", "Team-2"] + }, + "source": { + "id": "SourceID-1", + "instance": "SourceInstance-1", + "options": "SourceOptions-1", + "time": "0001-01-01T00:00:00Z", + "name": "SourceName-1", + "component": "SourceComponent-1" + }, + "resources": [ + { + "name": "ResourceName-1", + "attributes": ["Attr-1", "Attr-2"], + "resources": [ + { + "Attr-1": "1", + "Attr-2": "2" + } + ] + } + ], + "total_exposure": 10, + "current_exposure": 5 + }, + "headers": [ + { + "key": "version", + "value": "0.0.1" + } + ] + } +] diff --git a/pkg/events/stream.go b/pkg/events/stream.go new file mode 100644 index 0000000..7523fc9 --- /dev/null +++ b/pkg/events/stream.go @@ -0,0 +1,137 @@ +/* +Copyright 2022 Adevinta +*/ +// Package stream allows to interact with different stream-processing +// platforms. +package events + +import ( + "context" + "time" +) + +// Message represents a message coming from a stream. +type Message struct { + Key []byte + Value []byte + Headers []MetadataEntry +} + +// MetadataEntry represents a metadata entry. +type MetadataEntry struct { + Key []byte + Value []byte +} + +// A Processor represents a stream message processor. +type Processor interface { + Process(ctx context.Context, entity string, h MsgHandler) error +} + +// A MsgHandler processes a message. +type MsgHandler func(msg Message) error + +// Target represents the target +// scope for a check execution. +type Target struct { + ID string `json:"id"` + Identifier string `json:"identifier"` +} + +// TargetTeams represents a target along with its associated teams. +type TargetTeams struct { + Target + Teams []string `json:"teams"` +} + +// Issue represents a security vulnerability. +type Issue struct { + ID string `json:"id"` + Summary string `json:"summary"` + CWEID uint32 `json:"cwe_id"` + Description string `json:"description"` + Recommendations []string `json:"recommendations"` + ReferenceLinks []string `json:"reference_links"` +} + +// IssueLabels represents an issue along with its associated labels. +type IssueLabels struct { + Issue + Labels []string `json:"labels"` +} + +// SourceFamily represents the set of sources with same +// name, component and target. +type SourceFamily struct { + Name string `json:"name" db:"name"` + Component string `json:"component" db:"component"` + Target string `json:"-"` +} + +// Source represents a source +// which reports vulnerabilities. +type Source struct { + ID string `json:"id" db:"id"` + Instance string `json:"instance" db:"instance"` + Options string `json:"options" db:"options"` + Time time.Time `json:"time" db:"time"` + SourceFamily +} + +// Finding represents the relationship between an Issue and a Target. +type Finding struct { + ID string `json:"id"` + AffectedResource string `json:"affected_resource"` + Score float32 `json:"score"` + Status string `json:"status"` + Details string `json:"details"` + ImpactDetails string `json:"impact_details"` + Issue Issue `json:"issue"` + + // TotalExposure contains the time, in hours, a finding was open during its + // entire lifetime. For instance: if the finding was open for one day, then + // it was fixed, open again for onther day and finally fixed, the + // TotalExposure would be 48. If the finding is OPEN the TotalExposure also + // includes the period between the last time it was found and now. + TotalExposure int64 `json:"total_exposure"` + + // Resources stores the resources of the finding, if any, in as a non typed json. + Resources Resources `json:"resources"` + //OpenFinding defines the fields that only open findings have defined. + // *OpenFinding +} + +// Resources defines the structure of a the resources of a finding. +type Resources []ResourceGroup + +// ResourceGroup reprents a resource in a finding. +type ResourceGroup struct { + Name string `json:"name"` + Attributes []string `json:"attributes"` + Resources []map[string]string `json:"resources"` +} + +// FindingExpanded represents a finding expanding the associated target, issue +// and source data. +type FindingExpanded struct { + Finding + Issue IssueLabels `json:"issue"` + Target TargetTeams `json:"target"` + Source Source `json:"source"` + Resources Resources `json:"resources"` + TotalExposure int64 `json:"total_exposure"` + CurrentExposure int64 `json:"current_exposure,omitempty"` +} + +// FindingNotification represents a notification associated with a finding state change. +type FindingNotification struct { + FindingExpanded + // TODO: Tag field is defined here in order for the FindingNotification struct to be + // backward compatible with the "old" vulnerability notifications format so integrations + // with external components that still use the old representation (e.g.: Hermes) are + // maintained and can be isolated in the notify pkg. Once these integrations are modified + // to use the new notification format we'll have to decide if we make this tag field + // serializable into JSON and keep using the tag field as identifier, or we migrate to use + // the current team identifier. + Tag string `json:"-"` +} diff --git a/pkg/events/streamtest/streamtest.go b/pkg/events/streamtest/streamtest.go new file mode 100644 index 0000000..3cb3d31 --- /dev/null +++ b/pkg/events/streamtest/streamtest.go @@ -0,0 +1,89 @@ +/* +Copyright 2022 Adevinta +*/ +// Package streamtest provides utilities for stream testing. +package streamtest + +import ( + "context" + "encoding/json" + "os" + + "github.com/adevinta/vulcan-tracker/pkg/events" +) + +// MustParse parses a json file with messages and returns them. It panics if +// the file cannot be parsed. +func MustParse(filename string) []events.Message { + f, err := os.Open(filename) + if err != nil { + panic(err) + } + defer f.Close() + + var testdata []struct { + Key *string `json:"key,omitempty"` + Value *events.FindingNotification `json:"value,omitempty"` + Headers []struct { + Key string `json:"key"` + Value string `json:"value"` + } `json:"headers,omitempty"` + } + + if err := json.NewDecoder(f).Decode(&testdata); err != nil { + panic(err) + } + + var msgs []events.Message + for _, td := range testdata { + var msg events.Message + if td.Key != nil { + msg.Key = []byte(*td.Key) + } + if td.Value != nil { + payload, err := json.Marshal(td.Value) + if err != nil { + panic(err) + } + msg.Value = payload + } + for _, e := range td.Headers { + if e.Key == "" { + panic("empty metadata key") + } + if e.Value == "" { + panic("empty metadata value") + } + entry := events.MetadataEntry{ + Key: []byte(e.Key), + Value: []byte(e.Value), + } + msg.Headers = append(msg.Headers, entry) + } + msgs = append(msgs, msg) + } + + return msgs +} + +// MockProcessor mocks a stream processor with a predefined set of messages. It +// implements the interface [stream.Processor]. +type MockProcessor struct { + msgs []events.Message +} + +// NewMockProcessor returns a [MockProcessor]. It initializes its internal list +// of messages with msgs. +func NewMockProcessor(msgs []events.Message) *MockProcessor { + return &MockProcessor{msgs} +} + +// Process processes the messages passed to [NewMockProcessor]. +func (mp *MockProcessor) Process(ctx context.Context, entity string, h events.MsgHandler) error { + for _, msg := range mp.msgs { + if err := h(msg); err != nil { + return err + } + } + return nil +} diff --git a/pkg/events/streamtest/streamtest_test.go b/pkg/events/streamtest/streamtest_test.go new file mode 100644 index 0000000..c995492 --- /dev/null +++ b/pkg/events/streamtest/streamtest_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2022 Adevinta +*/ +package streamtest + +import ( + "testing" + "unicode" + + "github.com/adevinta/vulcan-tracker/pkg/events" + "github.com/google/go-cmp/cmp" +) + +func removeSpaces(s string) string { + rr := make([]rune, 0, len(s)) + for _, r := range s { + if !unicode.IsSpace(r) { + rr = append(rr, r) + } + } + return string(rr) +} + +func TestParse(t *testing.T) { + tests := []struct { + name string + filename string + want []events.Message + shouldPanic bool + }{ + { + name: "valid file", + filename: "testdata/valid.json", + want: []events.Message{ + { + Key: []byte("FindingID-1"), + Value: []byte(removeSpaces(` + { + "id": "FindingID-1", + "affected_resource": "AffectedResource-1", + "score": 9, + "status": "OPEN", + "details": "Details-1", + "impact_details": "ImpactDetails-1", + "issue": { + "id": "IssueID-1", + "summary": "Summary-1", + "cwe_id": 1, + "description": "Description-1", + "recommendations": [ + "Recommendation-1", + "Recommendation-2" + ], + "reference_links": [ + "ReferenceLink-1", + "ReferenceLink-2" + ], + "labels": [ + "Label-1", + "Label-2" + ] + }, + "target": { + "id": "TargetID-1", + "identifier": "Identifier-1", + "teams": [ + "Team-1", + "Team-2" + ] + }, + "source": { + "id": "SourceID-1", + "instance": "SourceInstance-1", + "options": "SourceOptions-1", + "time": "0001-01-01T00:00:00Z", + "name": "SourceName-1", + "component": "SourceComponent-1" + }, + "resources": [ + { + "name": "ResourceName-1", + "attributes": [ + "Attr-1", + "Attr-2" + ], + "resources": [ + { + "Attr-1": "1", + "Attr-2": "2" + } + ] + } + ], + "total_exposure": 10, + "current_exposure": 5 + } + `)), + Headers: []events.MetadataEntry{ + { + Key: []byte("version"), + Value: []byte("0.0.1"), + }, + }, + }, + }, + shouldPanic: false, + }, + { + name: "malformed json", + filename: "testdata/malformed_json.json", + want: nil, + shouldPanic: true, + }, + { + name: "null metadata key", + filename: "testdata/malformed_null_metadata_key.json", + want: nil, + shouldPanic: true, + }, + { + name: "null metadata value", + filename: "testdata/malformed_null_metadata_value.json", + want: nil, + shouldPanic: true, + }, + { + name: "nonexistent file", + filename: "testdata/nonexistent.json", + want: nil, + shouldPanic: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer func() { + if err := recover(); (err != nil) != tt.shouldPanic { + t.Errorf("unexpected panic behavior: %v", err) + } + }() + + got := MustParse(tt.filename) + if diff := cmp.Diff(tt.want, got); diff != "" { + t.Errorf("asset mismatch (-want +got):\n%v", diff) + } + }) + } +} diff --git a/pkg/events/streamtest/testdata/malformed_json.json b/pkg/events/streamtest/testdata/malformed_json.json new file mode 100644 index 0000000..afa8b2c --- /dev/null +++ b/pkg/events/streamtest/testdata/malformed_json.json @@ -0,0 +1,49 @@ +[ + { + "id": "2172c7d2-4cee-4eee-9573-9a8300d6657e", + "affected_resource": "onetimesecret.mpi-internal.com", + "score": 3.9, + "status": "FIXED", + "details": "", + "impact_details": "", + "issue": { + "id": "b4de81a0-f446-4f4a-9d58-40e161d6cd9d", + "summary": "HTTP Content Security Policy Not Implemented", + "cwe_id": 358, + "description": "Content Security Policy (CSP) is an HTTP header that allows site operators fine-grained control over where resources on their site can be loaded from. The use of this header is the best method to prevent cross-site scripting (XSS) vulnerabilities. Due to the difficulty in retrofitting CSP into existing websites, CSP is mandatory for all new websites and is strongly recommended for all existing high-risk sites.", + "recommendations": [ + "Implement a well-formed and correct CSP policy for this site.", + "The policy should not contain unsafe directive for anything but style.", + "The policy should not allow to load content from an insecure scheme (only https)." + ], + "reference_links": [ + "https://wiki.mozilla.org/Security/Guidelines/Web_Security#Content_Security_Policy", + "https://developer.mozilla.org/en-US/docs/Web/HTTP/CSP", + "https://content-security-policy.com/", + "https://observatory.mozilla.org/" + ], + "labels": [ + "http", + "issue" + ] + }, + "target": { + "id": "ef395371-b7bd-4dfd-9ebf-d32156945fc2", + "identifier": "onetimesecret.mpi-internal.com", + "teams": [ + "1c8a9935-9422-47d1-9bf6-72dde721c49f", + "2af823c1-b029-4c62-9d31-3be1d18a14d5", + "9f6b3510-4cf4-4408-b063-86a75119128e" + ] + }, + "source": { + "id": "e72a908f-194e-4496-afa8-df509bb2c1a1", + "instance": "4febf6fd-b01c-11ec-82a8-f2c663317896", + "options": "{\"security_level\":2}", + "time": "2022-03-30T11:32:02Z", + "name": "vulcan", + "component": "vulcan-http-headers" + }, + "resources": [], + "total_exposure": 0 + diff --git a/pkg/events/streamtest/testdata/malformed_null_metadata_key.json b/pkg/events/streamtest/testdata/malformed_null_metadata_key.json new file mode 100644 index 0000000..5b93a39 --- /dev/null +++ b/pkg/events/streamtest/testdata/malformed_null_metadata_key.json @@ -0,0 +1,10 @@ +[ + { + "key": "key0", + "value": "value0", + "headers": { + "key": null, + "value": "hvalue00" + } + } +] diff --git a/pkg/events/streamtest/testdata/malformed_null_metadata_value.json b/pkg/events/streamtest/testdata/malformed_null_metadata_value.json new file mode 100644 index 0000000..55d06f5 --- /dev/null +++ b/pkg/events/streamtest/testdata/malformed_null_metadata_value.json @@ -0,0 +1,10 @@ +[ + { + "key": "key0", + "value": "value0", + "headers": { + "key": "hkey00", + "value": null + } + } +] diff --git a/pkg/events/streamtest/testdata/valid.json b/pkg/events/streamtest/testdata/valid.json new file mode 100644 index 0000000..681146b --- /dev/null +++ b/pkg/events/streamtest/testdata/valid.json @@ -0,0 +1,55 @@ +[ + { + "key": "FindingID-1", + "value": { + "id": "FindingID-1", + "affected_resource": "AffectedResource-1", + "score": 9, + "status": "OPEN", + "details": "Details-1", + "impact_details": "ImpactDetails-1", + "issue": { + "id": "IssueID-1", + "summary": "Summary-1", + "cwe_id": 1, + "description": "Description-1", + "recommendations": ["Recommendation-1", "Recommendation-2"], + "reference_links": ["ReferenceLink-1", "ReferenceLink-2"], + "labels": ["Label-1", "Label-2"] + }, + "target": { + "id": "TargetID-1", + "identifier": "Identifier-1", + "teams": ["Team-1", "Team-2"] + }, + "source": { + "id": "SourceID-1", + "instance": "SourceInstance-1", + "options": "SourceOptions-1", + "time": "0001-01-01T00:00:00Z", + "name": "SourceName-1", + "component": "SourceComponent-1" + }, + "resources": [ + { + "name": "ResourceName-1", + "attributes": ["Attr-1", "Attr-2"], + "resources": [ + { + "Attr-1": "1", + "Attr-2": "2" + } + ] + } + ], + "total_exposure": 10, + "current_exposure": 5 + }, + "headers": [ + { + "key": "version", + "value": "0.0.1" + } + ] + } +] diff --git a/pkg/log/log.go b/pkg/log/log.go new file mode 100644 index 0000000..5d61cc6 --- /dev/null +++ b/pkg/log/log.go @@ -0,0 +1,277 @@ +/* +Copyright 2022 Adevinta +*/ +// Package log exports logging primitives. +// +// This package is based on upspin.io/log. Its license as well as the original +// code can be found in the [upspin repository]. +// +// [upspin repository]: https://github.com/upspin/upspin +package log + +import ( + "bytes" + "fmt" + "io" + "log" + "os" + "sync" +) + +// Logger is the interface for logging messages. +type Logger interface { + // Printf writes a formated message to the log. + Printf(format string, v ...any) + + // Print writes a message to the log. + Print(v ...any) + + // Println writes a line to the log. + Println(v ...any) + + // Fatal writes a message to the log and aborts. + Fatal(v ...any) + + // Fatalf writes a formated message to the log and aborts. + Fatalf(format string, v ...any) +} + +// Level represents the level of logging. +type Level int + +// Different levels of logging. +const ( + DebugLevel Level = iota + InfoLevel + ErrorLevel + DisabledLevel +) + +// The set of default loggers for each log level. +var ( + Debug = &logger{DebugLevel} + Info = &logger{InfoLevel} + Error = &logger{ErrorLevel} +) + +type globalState struct { + currentLevel Level + defaultLogger Logger +} + +var ( + mu sync.RWMutex + state = globalState{ + currentLevel: InfoLevel, + defaultLogger: newDefaultLogger(os.Stderr), + } +) + +func globals() globalState { + mu.RLock() + defer mu.RUnlock() + return state +} + +func newDefaultLogger(w io.Writer) Logger { + return log.New(w, "", log.Ldate|log.Ltime|log.LUTC|log.Lmicroseconds) +} + +// logBridge augments the Logger type with the io.Writer interface enabling +// NewStdLogger to connect Go's standard library logger to the logger provided +// by this package. +type logBridge struct { + Logger +} + +// Write parses the standard logging line (configured with log.Lshortfile) and +// passes its message component to the logger provided by this package. +func (lb logBridge) Write(b []byte) (n int, err error) { + var message string + // Split "f.go:42: message" into "f.go", "42", and "message". + parts := bytes.SplitN(b, []byte{':'}, 3) + if len(parts) != 3 || len(parts[0]) < 1 || len(parts[2]) < 1 { + message = fmt.Sprintf("bad log format: %s", b) + } else { + message = string(parts[2][1:]) // Skip leading space. + } + lb.Print(message) + return len(b), nil +} + +// NewStdLogger creates a *log.Logger ("log" is from the Go standard library) +// that forwards messages to the provided logger using a logBridge. The +// standard logger is configured with log.Lshortfile, this log line +// format which is parsed to extract the log message (skipping the filename, +// line number) to forward it to the provided logger. +func NewStdLogger(l Logger) *log.Logger { + lb := logBridge{l} + return log.New(lb, "", log.Lshortfile) +} + +// SetOutput sets the default loggers to write to w. If w is nil, the default +// loggers are disabled. +func SetOutput(w io.Writer) { + mu.Lock() + defer mu.Unlock() + + if w == nil { + state.defaultLogger = nil + } else { + state.defaultLogger = newDefaultLogger(w) + } +} + +type logger struct { + level Level +} + +var _ Logger = (*logger)(nil) + +// Printf writes a formatted message to the log. +func (l *logger) Printf(format string, v ...any) { + g := globals() + + if l.level < g.currentLevel { + return // Don't log at lower levels. + } + if g.defaultLogger != nil { + g.defaultLogger.Printf(format, v...) + } +} + +// Print writes a message to the log. +func (l *logger) Print(v ...any) { + g := globals() + + if l.level < g.currentLevel { + return // Don't log at lower levels. + } + if g.defaultLogger != nil { + g.defaultLogger.Print(v...) + } +} + +// Println writes a line to the log. +func (l *logger) Println(v ...any) { + g := globals() + + if l.level < g.currentLevel { + return // Don't log at lower levels. + } + if g.defaultLogger != nil { + g.defaultLogger.Println(v...) + } +} + +// Fatal writes a message to the log and aborts, regardless of the current log +// level. +func (l *logger) Fatal(v ...any) { + g := globals() + + if g.defaultLogger != nil { + g.defaultLogger.Fatal(v...) + } else { + log.Fatal(v...) + } +} + +// Fatalf writes a formatted message to the log and aborts, regardless of the +// current log level. +func (l *logger) Fatalf(format string, v ...any) { + g := globals() + + if g.defaultLogger != nil { + g.defaultLogger.Fatalf(format, v...) + } else { + log.Fatalf(format, v...) + } +} + +// String returns the name of the logger. +func (l *logger) String() string { + return toString(l.level) +} + +func toString(level Level) string { + switch level { + case InfoLevel: + return "info" + case DebugLevel: + return "debug" + case ErrorLevel: + return "error" + case DisabledLevel: + return "disabled" + } + return "unknown" +} + +func toLevel(level string) (Level, error) { + switch level { + case "info": + return InfoLevel, nil + case "debug": + return DebugLevel, nil + case "error": + return ErrorLevel, nil + case "disabled": + return DisabledLevel, nil + } + return DisabledLevel, fmt.Errorf("invalid log level %q", level) +} + +// GetLevel returns the current logging level. +func GetLevel() string { + g := globals() + + return toString(g.currentLevel) +} + +// SetLevel sets the current level of logging. +func SetLevel(level string) error { + l, err := toLevel(level) + if err != nil { + return err + } + mu.Lock() + state.currentLevel = l + mu.Unlock() + return nil +} + +// At returns whether the level will be logged currently. +func At(level string) bool { + g := globals() + + l, err := toLevel(level) + if err != nil { + return false + } + return g.currentLevel <= l +} + +// Printf writes a formatted message to the log. +func Printf(format string, v ...any) { + Info.Printf(format, v...) +} + +// Print writes a message to the log. +func Print(v ...any) { + Info.Print(v...) +} + +// Println writes a line to the log. +func Println(v ...any) { + Info.Println(v...) +} + +// Fatal writes a message to the log and aborts. +func Fatal(v ...any) { + Info.Fatal(v...) +} + +// Fatalf writes a formatted message to the log and aborts. +func Fatalf(format string, v ...any) { + Info.Fatalf(format, v...) +} diff --git a/pkg/log/log_test.go b/pkg/log/log_test.go new file mode 100644 index 0000000..a754ec8 --- /dev/null +++ b/pkg/log/log_test.go @@ -0,0 +1,116 @@ +/* +Copyright 2022 Adevinta +*/ +package log + +import ( + "fmt" + "testing" +) + +func TestLogLevel(t *testing.T) { + const ( + msg1 = "log line1" + msg2 = "log line2" + msg3 = "log line3" + ) + setMockLogger(fmt.Sprintf("%shello: %s", msg2, msg3), false) + + level := "info" + mustSetLevel(level) + if GetLevel() != level { + t.Fatalf("Expected %q, got %q", level, GetLevel()) + } + Debug.Println(msg1) // not logged + Info.Print(msg2) // logged + Error.Printf("hello: %s", msg3) // logged + + globals().defaultLogger.(*mockLogger).Verify(t) +} + +func TestDisable(t *testing.T) { + setMockLogger("Starting server...", false) + mustSetLevel("debug") + Debug.Printf("Starting server...") + mustSetLevel("disabled") + Error.Printf("Important stuff you'll miss!") + globals().defaultLogger.(*mockLogger).Verify(t) +} + +func TestFatal(t *testing.T) { + const msg = "will abort anyway" + + setMockLogger(msg, true) + + mustSetLevel("error") + Info.Fatal(msg) + + globals().defaultLogger.(*mockLogger).Verify(t) +} + +func TestAt(t *testing.T) { + mustSetLevel("info") + + if At("debug") { + t.Errorf("Debug is expected to be disabled when level is info") + } + if !At("error") { + t.Errorf("Error is expected to be enabled when level is info") + } +} + +func TestDisableDefaultLoggers(t *testing.T) { + SetOutput(nil) // disable default loggers. + Print("not printed") +} + +func mustSetLevel(level string) { + if err := SetLevel(level); err != nil { + panic(err) + } +} + +func setMockLogger(expected string, fatalExpected bool) { + state.defaultLogger = &mockLogger{ + expected: expected, + fatalExpected: fatalExpected, + } +} + +type mockLogger struct { + fatal bool + logged string + expected string + fatalExpected bool +} + +func (ml *mockLogger) Printf(format string, v ...interface{}) { + ml.logged += fmt.Sprintf(format, v...) +} + +func (ml *mockLogger) Print(v ...interface{}) { + ml.logged += fmt.Sprint(v...) +} + +func (ml *mockLogger) Println(v ...interface{}) { + ml.logged += fmt.Sprintln(v...) +} + +func (ml *mockLogger) Fatal(v ...interface{}) { + ml.fatal = true + ml.Print(v...) +} + +func (ml *mockLogger) Fatalf(format string, v ...interface{}) { + ml.fatal = true + ml.Printf(format, v...) +} + +func (ml *mockLogger) Verify(t *testing.T) { + if ml.logged != ml.expected { + t.Errorf("Expected %q, got %q", ml.expected, ml.logged) + } + if ml.fatal != ml.fatalExpected { + t.Errorf("Expected fatal %v, got %v", ml.fatalExpected, ml.fatal) + } +} diff --git a/pkg/model/model.go b/pkg/model/model.go index 8965233..5519b9d 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -40,4 +40,5 @@ type ProjectConfig struct { VulnerabilityIssueType string FixedWorkflow []string WontFixWorkflow []string + AutoCreate bool } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d714f82..8e7bd9e 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,3 +1,6 @@ +/* +Copyright 2022 Adevinta +*/ package storage import "github.com/adevinta/vulcan-tracker/pkg/model" diff --git a/pkg/storage/toml.go b/pkg/storage/toml.go index 2526df9..48c5b33 100644 --- a/pkg/storage/toml.go +++ b/pkg/storage/toml.go @@ -1,3 +1,6 @@ +/* +Copyright 2022 Adevinta +*/ package storage import ( @@ -51,6 +54,7 @@ func (ts *TOMLStore) ProjectsConfig() ([]model.ProjectConfig, error) { VulnerabilityIssueType: team.VulnerabilityIssueType, FixedWorkflow: team.FixWorkflow, WontFixWorkflow: team.WontFixWorkflow, + AutoCreate: team.AutoCreate, } projectConfigs = append(projectConfigs, teamConfig) } @@ -73,6 +77,7 @@ func (ts *TOMLStore) ProjectConfig(teamId string) (*model.ProjectConfig, error) VulnerabilityIssueType: team.VulnerabilityIssueType, FixedWorkflow: team.FixWorkflow, WontFixWorkflow: team.WontFixWorkflow, + AutoCreate: team.AutoCreate, } return projectConfig, nil diff --git a/pkg/storage/toml_test.go b/pkg/storage/toml_test.go index 6d0920e..9780d82 100644 --- a/pkg/storage/toml_test.go +++ b/pkg/storage/toml_test.go @@ -9,12 +9,12 @@ import ( "github.com/adevinta/vulcan-tracker/pkg/config" "github.com/adevinta/vulcan-tracker/pkg/model" - testutil "github.com/adevinta/vulcan-tracker/pkg/testutils" + "github.com/adevinta/vulcan-tracker/pkg/testutils" "github.com/google/go-cmp/cmp" ) func errToStr(err error) string { - return testutil.ErrToStr(err) + return testutils.ErrToStr(err) } func TestServersConf(t *testing.T) { diff --git a/pkg/testutils/kafka.go b/pkg/testutils/kafka.go new file mode 100644 index 0000000..930eeda --- /dev/null +++ b/pkg/testutils/kafka.go @@ -0,0 +1,98 @@ +/* +Copyright 2022 Adevinta +*/ +package testutils + +import ( + "context" + "fmt" + "runtime" + "strings" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +// KafkaTestBroker contains the address of the local broker used for tests. +const KafkaTestBroker = "localhost:29092" + +// PrepareKafka creates a new empty topic in the local test Kafka server for +// each topic name present in topics maps. The name of the topics created will +// have the following shape: +// __test. It returns a new map +// with the entities remapped to new topics created. +func PrepareKafka(topic string) (map[string]string, error) { + // Generate a unique deterministic topic name for the caller of this function. + newTopics := map[string]string{} + var newTopicNames []string + pc, _, _, _ := runtime.Caller(1) + callerName := strings.Replace(runtime.FuncForPC(pc).Name(), ".", "_", -1) + callerName = strings.Replace(callerName, "-", "_", -1) + parts := strings.Split(callerName, "/") + name := strings.ToLower(fmt.Sprintf("%s_%s_test", topic, parts[len(parts)-1])) + // newTopics[entity] = name + newTopicNames = append(newTopicNames, name) + + err := createTopics(newTopicNames) + if err != nil { + return nil, err + } + return newTopics, nil +} + +func createTopics(names []string) error { + config := kafka.ConfigMap{ + "bootstrap.servers": KafkaTestBroker, + } + client, err := kafka.NewAdminClient(&config) + if err != nil { + return err + } + + waitDuration := time.Duration(time.Second * 60) + opTimeout := kafka.SetAdminOperationTimeout(waitDuration) + + results, err := client.DeleteTopics(context.Background(), names, opTimeout) + if err != nil { + return err + } + tResults := topicsOpResult(results) + if tResults.Error() != kafka.ErrNoError && tResults.Error() != kafka.ErrUnknownTopicOrPart { + return fmt.Errorf("error deleting topic %s", tResults.Error()) + } + + var topics []kafka.TopicSpecification + for _, name := range names { + topic := kafka.TopicSpecification{ + Topic: name, + NumPartitions: 1, + } + topics = append(topics, topic) + } + for { + results, err = client.CreateTopics(context.Background(), topics, opTimeout) + if err != nil { + return err + } + tResults = topicsOpResult(results) + if tResults.Error() == kafka.ErrNoError { + break + } + if tResults.Error() == kafka.ErrTopicAlreadyExists { + continue + } + return fmt.Errorf("error creating topics: %s", tResults.Error()) + } + return nil +} + +type topicsOpResult []kafka.TopicResult + +func (t topicsOpResult) Error() kafka.ErrorCode { + for _, res := range t { + if res.Error.Code() != kafka.ErrNoError { + return res.Error.Code() + } + } + return kafka.ErrNoError +} diff --git a/pkg/testutils/testutils.go b/pkg/testutils/testutils.go index d656054..4d1eb25 100644 --- a/pkg/testutils/testutils.go +++ b/pkg/testutils/testutils.go @@ -2,7 +2,7 @@ Copyright 2022 Adevinta */ -package testutil +package testutils func ErrToStr(err error) string { result := "" diff --git a/pkg/tracking/jira/client.go b/pkg/tracking/jira/client.go index 38069f0..a045298 100644 --- a/pkg/tracking/jira/client.go +++ b/pkg/tracking/jira/client.go @@ -16,6 +16,7 @@ import ( type Issuer interface { Get(issueID string, options *gojira.GetQueryOptions) (*gojira.Issue, *gojira.Response, error) + Search(jql string, options *gojira.SearchOptions) ([]gojira.Issue, *gojira.Response, error) Create(issue *gojira.Issue) (*gojira.Issue, *gojira.Response, error) GetTransitions(id string) ([]gojira.Transition, *gojira.Response, error) DoTransition(ticketID, transitionID string) (*gojira.Response, error) @@ -88,7 +89,29 @@ func (cl *Client) GetTicket(id string) (*model.Ticket, error) { return nil, err } return fromGoJiraToTicketModel(*jiraIssue), nil +} + +// FindTicket search tickets and return the first one if it exists. +// The arguments needed to search a ticke are the project key, the issue +// type and a text that have o be present on the ticket description. +// Return a nil ticket if not found. +func (cl *Client) FindTicket(projectKey, vulnerabilityIssueType, text string) (*model.Ticket, error) { + + jql := fmt.Sprintf("project=%s AND type=%s AND description~%s", + projectKey, vulnerabilityIssueType, text) + searchOptions := &gojira.SearchOptions{ + MaxResults: 1, + } + tickets, resp, err := cl.Issuer.Search(jql, searchOptions) + if err != nil { + err = gojira.NewJiraError(resp, err) + return nil, err + } + if len(tickets) == 0 { + return nil, nil + } + return fromGoJiraToTicketModel(tickets[0]), nil } // CreateTicket creates a ticket in Jira. diff --git a/pkg/tracking/jira/client_test.go b/pkg/tracking/jira/client_test.go index a28c9b4..83fb73d 100644 --- a/pkg/tracking/jira/client_test.go +++ b/pkg/tracking/jira/client_test.go @@ -6,6 +6,7 @@ package jira import ( "errors" "fmt" + "strings" "testing" "github.com/adevinta/vulcan-tracker/pkg/model" @@ -28,12 +29,45 @@ func (mis *MockIssueService) Get(issueID string, options *gojira.GetQueryOptions return nil, nil, fmt.Errorf("key %s not found. Status code: 404", issueID) } +// Get retrieves a tikcet by issueID. +func (mis *MockIssueService) Search(jql string, options *gojira.SearchOptions) ([]gojira.Issue, *gojira.Response, error) { + + splitted := strings.Split(jql, " ") + project := strings.Split(splitted[0], "=")[1] + issueType := strings.Split(splitted[2], "=")[1] + descriptionText := strings.Split(splitted[4], "~")[1] + + var ticketsFound []gojira.Issue + + for _, ticket := range mis.tickets { + if ticket.Fields.Project.Key != project { + continue + } + if ticket.Fields.Type.Name != issueType { + continue + } + if strings.Contains(ticket.Fields.Description, descriptionText) { + ticketsFound = append(ticketsFound, ticket) + } + + } + + return ticketsFound, nil, nil +} + func (mis *MockIssueService) Create(issue *gojira.Issue) (*gojira.Issue, *gojira.Response, error) { + + if issue.Fields.Summary == "" { + return nil, nil, fmt.Errorf("summary is mandatory. Status code: 400") + } + if issue.Fields.Type.Name == "" { + return nil, nil, fmt.Errorf("issue type is mandatory. Status code: 400") + } + issue.Key = fmt.Sprintf("%s-%d", issue.Fields.Project.Key, len(mis.tickets)+1) issue.ID = fmt.Sprintf("%d", 1000+len(mis.tickets)+1) issue.Fields.Status = &gojira.Status{Name: ToDo} mis.tickets[issue.Key] = *issue - return issue, nil, nil } @@ -48,7 +82,7 @@ func setupSubTestClient(t *testing.T) { Key: "TEST-1", Fields: &gojira.IssueFields{ Summary: "Summary TEST-1", - Description: "Description TEST-1", + Description: "Description TEST-1\nFindingID: FindingID-1", Project: gojira.Project{ Key: "TEST", }, @@ -65,7 +99,7 @@ func setupSubTestClient(t *testing.T) { Key: "TEST-2", Fields: &gojira.IssueFields{ Summary: "Summary TEST-2", - Description: "Description TEST-2", + Description: "Description TEST-2\nFindingID: FindingID-2", Project: gojira.Project{ Key: "TEST", }, @@ -102,7 +136,7 @@ func TestClient_Get(t *testing.T) { ID: "1000", Key: "TEST-1", Summary: "Summary TEST-1", - Description: "Description TEST-1", + Description: "Description TEST-1\nFindingID: FindingID-1", Project: "TEST", Status: ToDo, TicketType: "Vulnerability", @@ -177,4 +211,55 @@ func TestClient_Create(t *testing.T) { } } +func TestClient_Find(t *testing.T) { + tests := []struct { + name string + findingID string + projectKey string + vulnerabilityIssueType string + want *model.Ticket + wantErr error + }{ + { + name: "HappyPath", + findingID: "FindingID-1", + projectKey: "TEST", + vulnerabilityIssueType: "Vulnerability", + want: &model.Ticket{ + ID: "1000", + Key: "TEST-1", + Summary: "Summary TEST-1", + Description: "Description TEST-1\nFindingID: FindingID-1", + Project: "TEST", + Status: ToDo, + TicketType: "Vulnerability", + }, + wantErr: nil, + }, + { + name: "TicketNotFound", + findingID: "NOTFOUND", + projectKey: "TEST", + vulnerabilityIssueType: "Vulnerability", + want: nil, + wantErr: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + setupSubTestClient(t) + + got, err := jiraClient.FindTicket(tt.projectKey, tt.vulnerabilityIssueType, tt.findingID) + if errToStr(err) != errToStr(tt.wantErr) { + t.Fatalf("expected error: %v but got: %v", tt.wantErr, err) + } + diff := cmp.Diff(got, tt.want) + if diff != "" { + t.Fatalf("ticket does not match expected one. diff: %s\n", diff) + } + }) + } +} + // TODO: Pending more testing diff --git a/pkg/tracking/jira/jira.go b/pkg/tracking/jira/jira.go index 509827d..93f487a 100644 --- a/pkg/tracking/jira/jira.go +++ b/pkg/tracking/jira/jira.go @@ -18,6 +18,7 @@ type ( // TicketTrackingClient defines the API of the adapter for a third-party client. type TicketTrackingClient interface { GetTicket(id string) (*model.Ticket, error) + FindTicket(projectKey, vulnerabilityIssueType, text string) (*model.Ticket, error) CreateTicket(ticket *model.Ticket) (*model.Ticket, error) GetTicketTransitions(id string) ([]model.Transition, error) DoTransition(id, idTransition string) error diff --git a/pkg/tracking/jira/tickets.go b/pkg/tracking/jira/tickets.go index 2ea3a7e..5aff111 100644 --- a/pkg/tracking/jira/tickets.go +++ b/pkg/tracking/jira/tickets.go @@ -19,6 +19,17 @@ func (tc TC) GetTicket(id string) (*model.Ticket, error) { return ticket, nil } +// FindTicketByFindingID retrieves a ticket from Jira using the findingID. +// Return an empty ticket if not found. +func (tc TC) FindTicketByFindingID(projectKey, vulnerabilityIssueType, findingID string) (*model.Ticket, error) { + + ticket, err := tc.Client.FindTicket(projectKey, vulnerabilityIssueType, findingID) + if err != nil { + return nil, err + } + return ticket, nil +} + // CreateTicket creates an ticket in Jira. func (tc TC) CreateTicket(ticket *model.Ticket) (*model.Ticket, error) { createdTicket, err := tc.Client.CreateTicket(ticket) @@ -77,6 +88,9 @@ func (tc TC) FixTicket(id string, workflow []string) (*model.Ticket, error) { workflow = getSubWorkflow(workflow, ticket.Status) } + // TODO: When we can to transit from any state to a fixed state we need a list of valid states to do this. + // For example. If the ticket is in a CLOSED state we don't want acidentally go back to RESOLVED. + for _, transitionName := range workflow { // Get the available transitions for the ticket. diff --git a/pkg/tracking/jira/tickets_test.go b/pkg/tracking/jira/tickets_test.go index f4aa415..1754ad7 100644 --- a/pkg/tracking/jira/tickets_test.go +++ b/pkg/tracking/jira/tickets_test.go @@ -24,6 +24,7 @@ const ( ) type MockJiraClient struct { + TicketTrackingClient tickets map[string]*model.Ticket transitions map[string][]model.Transition } diff --git a/pkg/tracking/tracking.go b/pkg/tracking/tracking.go index cdb2141..d05fb00 100644 --- a/pkg/tracking/tracking.go +++ b/pkg/tracking/tracking.go @@ -33,6 +33,7 @@ type Pagination struct { // TicketTracker defines the interface for high level querying data from ticket tracker. type TicketTracker interface { GetTicket(id string) (*model.Ticket, error) + FindTicketByFindingID(projectKey, vulnerabilityIssueType, findingID string) (*model.Ticket, error) CreateTicket(ticket *model.Ticket) (*model.Ticket, error) GetTransitions(id string) ([]model.Transition, error) FixTicket(id string, workflow []string) (*model.Ticket, error) diff --git a/pkg/vulcan/vulcan.go b/pkg/vulcan/vulcan.go new file mode 100644 index 0000000..4c87c8e --- /dev/null +++ b/pkg/vulcan/vulcan.go @@ -0,0 +1,115 @@ +/* +Copyright 2022 Adevinta +*/ +package vulcan + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "strings" + + "github.com/adevinta/vulcan-tracker/pkg/events" +) + +const ( + // MajorVersion is the major version of the vulnerability db asynchronous API + // supported by [Client]. + MajorVersion = 0 + + // FindingEntityName is the name of the entity linked to findings. + FindingEntityName = "findings-v0" +) + +// Client is a vulnerability db async API client. +type Client struct { + proc events.Processor +} + +var ErrUnsupportedVersion = errors.New("unsupported version") + +// NewClient returns a client for the vulnerability db async API using the provided +// stream processor. +func NewClient(proc events.Processor) Client { + return Client{proc} +} + +// FindingHandler processes a finding. isNil is true when the value of the stream +// message is nil. +type FindingHandler func(payload events.FindingNotification, isNil bool) error + +// ProcessFindings receives findigns from the underlying stream and processes them +// using the provided handler. This method blocks the calling goroutine until +// the specified context is cancelled. +func (c Client) ProcessFindings(ctx context.Context, h FindingHandler) error { + return c.proc.Process(ctx, FindingEntityName, func(msg events.Message) error { + version, err := parseMetadata(msg) + if err != nil { + return fmt.Errorf("invalid metadata: %w", err) + } + + if !supportedVersion(version) { + return ErrUnsupportedVersion + } + + id := string(msg.Key) + + var ( + payload events.FindingNotification + isNil bool + ) + + if msg.Value != nil { + if err := json.Unmarshal(msg.Value, &payload); err != nil { + return fmt.Errorf("could not unmarshal finding with ID %q: %w", id, err) + } + } + + return h(payload, isNil) + }) +} + +// parseMetadata parses and validates message metadata. +func parseMetadata(msg events.Message) (version string, err error) { + for _, e := range msg.Headers { + key := string(e.Key) + value := string(e.Value) + + switch key { + case "version": + version = value + } + } + + if version == "" { + return "", errors.New("missing metadata entry") + } + + return version, nil +} + +// supportedVersion takes a semantic version string and returns true if it is +// compatible with [Client]. +func supportedVersion(v string) bool { + if v == "" { + return false + } + + if v[0] == 'v' { + v = v[1:] + } + + parts := strings.Split(v, ".") + if len(parts) < 3 { + return false + } + + major, err := strconv.Atoi(parts[0]) + if err != nil { + return false + } + + return major == MajorVersion +}