Skip to content

Commit d507d01

Browse files
authored
Merge pull request #2 from k6io/dropDependancies
Drop dependancies
2 parents 2d98b43 + 37d8bb4 commit d507d01

File tree

5 files changed

+267
-12
lines changed

5 files changed

+267
-12
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.16
44

55
require (
66
github.com/Shopify/sarama v1.16.0
7+
github.com/influxdata/influxdb1-client v0.0.0-20190402204710-8ff2fc3824fc
78
github.com/kelseyhightower/envconfig v1.4.0
89
github.com/kubernetes/helm v2.9.0+incompatible
910
github.com/loadimpact/k6 v0.31.1

pkg/kafka/collector.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ import (
2929
"github.com/sirupsen/logrus"
3030

3131
"github.com/loadimpact/k6/output"
32-
jsono "github.com/loadimpact/k6/output/json"
3332
"github.com/loadimpact/k6/stats"
34-
"github.com/loadimpact/k6/stats/influxdb"
3533
)
3634

3735
// Collector implements the lib.Collector interface and should be used only for testing
@@ -118,19 +116,18 @@ func (c *Collector) formatSamples(samples stats.Samples) ([]string, error) {
118116

119117
switch c.Config.Format.String {
120118
case "influxdb":
121-
i, err := influxdb.New(c.logger, c.Config.InfluxDBConfig)
119+
var err error
120+
fieldKinds, err := makeInfluxdbFieldKinds(c.Config.InfluxDBConfig.TagsAsFields)
122121
if err != nil {
123122
return nil, err
124123
}
125-
126-
metrics, err = i.Format(samples)
124+
metrics, err = formatAsInfluxdbV1(c.logger, samples, newExtractTagsFields(fieldKinds))
127125
if err != nil {
128126
return nil, err
129127
}
130128
default:
131129
for _, sample := range samples {
132-
env := jsono.WrapSample(sample)
133-
metric, err := json.Marshal(env)
130+
metric, err := json.Marshal(wrapSample(sample))
134131
if err != nil {
135132
return nil, err
136133
}

pkg/kafka/config.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"gopkg.in/guregu/null.v3"
3131

3232
"github.com/loadimpact/k6/lib/types"
33-
"github.com/loadimpact/k6/stats/influxdb"
3433
)
3534

3635
// Config is the config for the kafka collector
@@ -43,7 +42,7 @@ type Config struct {
4342
Format null.String `json:"format" envconfig:"K6_KAFKA_FORMAT"`
4443
PushInterval types.NullDuration `json:"push_interval" envconfig:"K6_KAFKA_PUSH_INTERVAL"`
4544

46-
InfluxDBConfig influxdb.Config `json:"influxdb"`
45+
InfluxDBConfig influxdbConfig `json:"influxdb"`
4746
}
4847

4948
// config is a duplicate of ConfigFields as we can not mapstructure.Decode into
@@ -54,15 +53,15 @@ type config struct {
5453
Format string `json:"format" mapstructure:"format" envconfig:"K6_KAFKA_FORMAT"`
5554
PushInterval string `json:"push_interval" mapstructure:"push_interval" envconfig:"K6_KAFKA_PUSH_INTERVAL"`
5655

57-
InfluxDBConfig influxdb.Config `json:"influxdb" mapstructure:"influxdb"`
56+
InfluxDBConfig influxdbConfig `json:"influxdb" mapstructure:"influxdb"`
5857
}
5958

6059
// NewConfig creates a new Config instance with default values for some fields.
6160
func NewConfig() Config {
6261
return Config{
6362
Format: null.StringFrom("json"),
6463
PushInterval: types.NullDurationFrom(1 * time.Second),
65-
InfluxDBConfig: influxdb.NewConfig(),
64+
InfluxDBConfig: newInfluxdbConfig(),
6665
}
6766
}
6867

@@ -96,7 +95,7 @@ func ParseArg(arg string) (Config, error) {
9695
}
9796

9897
if v, ok := params["influxdb"].(map[string]interface{}); ok {
99-
influxConfig, err := influxdb.ParseMap(v)
98+
influxConfig, err := influxdbParseMap(v)
10099
if err != nil {
101100
return c, err
102101
}

pkg/kafka/format_influxdb.go

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
*
3+
* k6 - a next-generation load testing tool
4+
* Copyright (C) 2021 Load Impact
5+
*
6+
* This program is free software: you can redistribute it and/or modify
7+
* it under the terms of the GNU Affero General Public License as
8+
* published by the Free Software Foundation, either version 3 of the
9+
* License, or (at your option) any later version.
10+
*
11+
* This program is distributed in the hope that it will be useful,
12+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
* GNU Affero General Public License for more details.
15+
*
16+
* You should have received a copy of the GNU Affero General Public License
17+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
*
19+
*/
20+
21+
package kafka
22+
23+
import (
24+
"fmt"
25+
"strconv"
26+
"strings"
27+
28+
client "github.com/influxdata/influxdb1-client/v2"
29+
"github.com/loadimpact/k6/lib/types"
30+
"github.com/loadimpact/k6/stats"
31+
"github.com/mitchellh/mapstructure"
32+
"github.com/sirupsen/logrus"
33+
)
34+
35+
type extractTagsToValuesFunc func(map[string]string, map[string]interface{}) map[string]interface{}
36+
37+
// format returns a string array of metrics in influx line-protocol
38+
func formatAsInfluxdbV1(
39+
logger logrus.FieldLogger, samples []stats.Sample, extractTagsToValues extractTagsToValuesFunc,
40+
) ([]string, error) {
41+
var metrics []string
42+
type cacheItem struct {
43+
tags map[string]string
44+
values map[string]interface{}
45+
}
46+
cache := map[*stats.SampleTags]cacheItem{}
47+
for _, sample := range samples {
48+
var tags map[string]string
49+
values := make(map[string]interface{})
50+
if cached, ok := cache[sample.Tags]; ok {
51+
tags = cached.tags
52+
for k, v := range cached.values {
53+
values[k] = v
54+
}
55+
} else {
56+
tags = sample.Tags.CloneTags()
57+
extractTagsToValues(tags, values)
58+
cache[sample.Tags] = cacheItem{tags, values}
59+
}
60+
values["value"] = sample.Value
61+
p, err := client.NewPoint(
62+
sample.Metric.Name,
63+
tags,
64+
values,
65+
sample.Time,
66+
)
67+
if err != nil {
68+
logger.WithError(err).Error("InfluxDB: Couldn't make point from sample!")
69+
return nil, err
70+
}
71+
metrics = append(metrics, p.String())
72+
}
73+
74+
return metrics, nil
75+
}
76+
77+
// FieldKind defines Enum for tag-to-field type conversion
78+
type FieldKind int
79+
80+
const (
81+
// String field (default)
82+
String FieldKind = iota
83+
// Int field
84+
Int
85+
// Float field
86+
Float
87+
// Bool field
88+
Bool
89+
)
90+
91+
func newExtractTagsFields(fieldKinds map[string]FieldKind) extractTagsToValuesFunc {
92+
return func(tags map[string]string, values map[string]interface{}) map[string]interface{} {
93+
for tag, kind := range fieldKinds {
94+
if val, ok := tags[tag]; ok {
95+
var v interface{}
96+
var err error
97+
98+
switch kind {
99+
case String:
100+
v = val
101+
case Bool:
102+
v, err = strconv.ParseBool(val)
103+
case Float:
104+
v, err = strconv.ParseFloat(val, 64)
105+
case Int:
106+
v, err = strconv.ParseInt(val, 10, 64)
107+
}
108+
if err == nil {
109+
values[tag] = v
110+
} else {
111+
values[tag] = val
112+
}
113+
114+
delete(tags, tag)
115+
}
116+
}
117+
return values
118+
}
119+
}
120+
121+
// makeFieldKinds reads the Config and returns a lookup map of tag names to
122+
// the field type their values should be converted to.
123+
func makeInfluxdbFieldKinds(tagsAsFields []string) (map[string]FieldKind, error) {
124+
fieldKinds := make(map[string]FieldKind)
125+
for _, tag := range tagsAsFields {
126+
var fieldName, fieldType string
127+
s := strings.SplitN(tag, ":", 2)
128+
if len(s) == 1 {
129+
fieldName, fieldType = s[0], "string"
130+
} else {
131+
fieldName, fieldType = s[0], s[1]
132+
}
133+
134+
err := checkDuplicatedTypeDefinitions(fieldKinds, fieldName)
135+
if err != nil {
136+
return nil, err
137+
}
138+
139+
switch fieldType {
140+
case "string":
141+
fieldKinds[fieldName] = String
142+
case "bool":
143+
fieldKinds[fieldName] = Bool
144+
case "float":
145+
fieldKinds[fieldName] = Float
146+
case "int":
147+
fieldKinds[fieldName] = Int
148+
default:
149+
return nil, fmt.Errorf("An invalid type (%s) is specified for an InfluxDB field (%s).",
150+
fieldType, fieldName)
151+
}
152+
}
153+
154+
return fieldKinds, nil
155+
}
156+
157+
func checkDuplicatedTypeDefinitions(fieldKinds map[string]FieldKind, tag string) error {
158+
if _, found := fieldKinds[tag]; found {
159+
return fmt.Errorf("A tag name (%s) shows up more than once in InfluxDB field type configurations.", tag)
160+
}
161+
return nil
162+
}
163+
164+
func (c influxdbConfig) Apply(cfg influxdbConfig) influxdbConfig {
165+
if len(cfg.TagsAsFields) > 0 {
166+
c.TagsAsFields = cfg.TagsAsFields
167+
}
168+
return c
169+
}
170+
171+
// ParseMap parses a map[string]interface{} into a Config
172+
func influxdbParseMap(m map[string]interface{}) (influxdbConfig, error) {
173+
c := influxdbConfig{}
174+
if v, ok := m["tagsAsFields"].(string); ok {
175+
m["tagsAsFields"] = []string{v}
176+
}
177+
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
178+
DecodeHook: types.NullDecoder,
179+
Result: &c,
180+
})
181+
if err != nil {
182+
return c, err
183+
}
184+
185+
err = dec.Decode(m)
186+
return c, err
187+
}
188+
189+
type influxdbConfig struct {
190+
TagsAsFields []string `json:"tagsAsFields,omitempty" envconfig:"K6_INFLUXDB_TAGS_AS_FIELDS"`
191+
}
192+
193+
func newInfluxdbConfig() influxdbConfig {
194+
c := influxdbConfig{
195+
TagsAsFields: []string{"vu", "iter", "url"},
196+
}
197+
return c
198+
}

pkg/kafka/format_json.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
*
3+
* k6 - a next-generation load testing tool
4+
* Copyright (C) 2021 Load Impact
5+
*
6+
* This program is free software: you can redistribute it and/or modify
7+
* it under the terms of the GNU Affero General Public License as
8+
* published by the Free Software Foundation, either version 3 of the
9+
* License, or (at your option) any later version.
10+
*
11+
* This program is distributed in the hope that it will be useful,
12+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
* GNU Affero General Public License for more details.
15+
*
16+
* You should have received a copy of the GNU Affero General Public License
17+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
*
19+
*/
20+
21+
package kafka
22+
23+
import (
24+
"time"
25+
26+
"github.com/loadimpact/k6/stats"
27+
)
28+
29+
// wrapSample is used to package a metric sample in a way that's nice to export
30+
// to JSON.
31+
func wrapSample(sample stats.Sample) envolope {
32+
return envolope{
33+
Type: "Point",
34+
Metric: sample.Metric.Name,
35+
Data: newJSONSample(sample),
36+
}
37+
}
38+
39+
// envolope is the data format we use to export both metrics and metric samples
40+
// to the JSON file.
41+
type envolope struct {
42+
Type string `json:"type"`
43+
Data interface{} `json:"data"`
44+
Metric string `json:"metric,omitempty"`
45+
}
46+
47+
// jsonSample is the data format for metric sample data in the JSON file.
48+
type jsonSample struct {
49+
Time time.Time `json:"time"`
50+
Value float64 `json:"value"`
51+
Tags *stats.SampleTags `json:"tags"`
52+
}
53+
54+
func newJSONSample(sample stats.Sample) jsonSample {
55+
return jsonSample{
56+
Time: sample.Time,
57+
Value: sample.Value,
58+
Tags: sample.Tags,
59+
}
60+
}

0 commit comments

Comments
 (0)