Skip to content
13 changes: 11 additions & 2 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func (m ConfigMap) SetKey(key string, value ConfigValue) error {
if !found {
m["default.topic.config"] = ConfigMap{}
}
m["default.topic.config"].(ConfigMap)[strings.TrimPrefix(key, "{topic}.")] = value
if cm, ok := m["default.topic.config"].(ConfigMap); !ok {
return newErrorFromString(ErrInvalidArg, "value type is not a ConfigMap")
} else {
cm[strings.TrimPrefix(key, "{topic}.")] = value
}
} else {
m[key] = value
}
Expand Down Expand Up @@ -228,7 +232,12 @@ func (m ConfigMap) get(key string, defval ConfigValue) (ConfigValue, error) {
if !found {
return defval, nil
}
return defconfCv.(ConfigMap).get(strings.TrimPrefix(key, "{topic}."), defval)

if cm, ok := defconfCv.(ConfigMap); !ok {
return nil, newErrorFromString(ErrInvalidArg, "value type is not a ConfigMap")
} else {
return cm.get(strings.TrimPrefix(key, "{topic}."), defval)
}
}

v, ok := m[key]
Expand Down
29 changes: 29 additions & 0 deletions kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,35 @@ func TestConfigMapAPIs(t *testing.T) {
t.Errorf("Expected nil for dummy value, got %v\n", v)
}

config2 := &ConfigMap{
"default.topic.config": "dummy",
}
_, err = config2.Get("{topic}.produce.offset.report", false)
if err == nil {
t.Errorf("Expected Get({topic}.produce.offset.report) to fail\n")
}
err = config2.SetKey("{topic}.produce.offset.report", true)
if err == nil {
t.Errorf("Expected SetKey({topic}.produce.offset.report) to fail\n")
}

config3 := &ConfigMap{
"default.topic.config": ConfigMap{},
}
err = config3.SetKey("{topic}.produce.offset.report", true)
if err != nil {
t.Errorf("Expected SetKey({topic}.produce.offset.report) to succeed: %s\n", err)
}
v, err = config3.Get("{topic}.produce.offset.report", false)
if err != nil {
t.Errorf("Expected Get({topic}.produce.offset.report) to succeed: %s\n", err)
}
if v == nil {
t.Errorf("Expected Get({topic}.produce.offset.report) to return non-nil value\n")
}
if v.(bool) != true {
t.Errorf("produce.offset.report mismatch: %v\n", v.(bool))
}
}

// Test that plugins will always be configured before their config options
Expand Down