Skip to content

Commit e0bcf7e

Browse files
authored
Check version when version is not (#345)
* Check version when version is not set and is less than 3.11 * Reduce the log level for the exchange version --------- Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
1 parent e4f6b06 commit e0bcf7e

File tree

2 files changed

+8
-4
lines changed

2 files changed

+8
-4
lines changed

pkg/stream/available_features.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (a *availableFeatures) BrokerFilterEnabled() bool {
4545
func (a *availableFeatures) IsBrokerSingleActiveConsumerEnabled() bool {
4646
lock.Lock()
4747
defer lock.Unlock()
48-
return a.brokerSingleActiveConsumerEnabled == a.is311OrMore
48+
return a.brokerSingleActiveConsumerEnabled
4949
}
5050

5151
func (a *availableFeatures) SetVersion(version string) error {

pkg/stream/client.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,13 @@ func (c *Client) connect() error {
212212
return err2
213213
}
214214

215-
logs.LogDebug("Server properties: %s", c.serverProperties)
216-
if serverProperties["version"] == "" {
217-
logs.LogInfo(
215+
err = c.availableFeatures.SetVersion(serverProperties["version"])
216+
if err != nil {
217+
logs.LogWarn("Error checking server version: %s", err)
218+
}
219+
220+
if serverProperties["version"] == "" || !c.availableFeatures.Is311OrMore() {
221+
logs.LogDebug(
218222
"Server version is less than 3.11.0, skipping command version exchange")
219223
} else {
220224
err := c.exchangeVersion(c.serverProperties["version"])

0 commit comments

Comments
 (0)