Skip to content

Commit d19a9f8

Browse files
thomas-bousquetnodeceCopilot
authored
feat: add WithContext() admin client methods (#1425)
* feat: add WithContext() admin client methods * docs: update missed client packages methods comments * docs: fix typo * feat: add WithContext() methods to admin client. chore: satisfy linter rules * chore: fix last linter issues * Use background instead of todo Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: use context.Background() * style: capitalized comment --------- Co-authored-by: Zixuan Liu <nodeces@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent bddaeba commit d19a9f8

File tree

17 files changed

+4117
-491
lines changed

17 files changed

+4117
-491
lines changed

pulsaradmin/pkg/admin/broker_stats.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package admin
1919

2020
import (
21+
"context"
22+
2123
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
2224
)
2325

@@ -26,17 +28,32 @@ type BrokerStats interface {
2628
// GetMetrics returns Monitoring metrics
2729
GetMetrics() ([]utils.Metrics, error)
2830

31+
// GetMetricsWithContext returns Monitoring metrics
32+
GetMetricsWithContext(context.Context) ([]utils.Metrics, error)
33+
2934
// GetMBeans requests JSON string server mbean dump
3035
GetMBeans() ([]utils.Metrics, error)
3136

37+
// GetMBeansWithContext requests JSON string server mbean dump
38+
GetMBeansWithContext(context.Context) ([]utils.Metrics, error)
39+
3240
// GetTopics returns JSON string topics stats
3341
GetTopics() (string, error)
3442

43+
// GetTopicsWithContext returns JSON string topics stats
44+
GetTopicsWithContext(context.Context) (string, error)
45+
3546
// GetLoadReport returns load report of broker
3647
GetLoadReport() (*utils.LocalBrokerData, error)
3748

49+
// GetLoadReport returns load report of broker
50+
GetLoadReportWithContext(context.Context) (*utils.LocalBrokerData, error)
51+
3852
// GetAllocatorStats returns stats from broker
3953
GetAllocatorStats(allocatorName string) (*utils.AllocatorStats, error)
54+
55+
// GetAllocatorStatsWithContext returns stats from broker
56+
GetAllocatorStatsWithContext(ctx context.Context, allocatorName string) (*utils.AllocatorStats, error)
4057
}
4158

4259
type brokerStats struct {
@@ -53,9 +70,13 @@ func (c *pulsarClient) BrokerStats() BrokerStats {
5370
}
5471

5572
func (bs *brokerStats) GetMetrics() ([]utils.Metrics, error) {
73+
return bs.GetMetricsWithContext(context.Background())
74+
}
75+
76+
func (bs *brokerStats) GetMetricsWithContext(ctx context.Context) ([]utils.Metrics, error) {
5677
endpoint := bs.pulsar.endpoint(bs.basePath, "/metrics")
5778
var response []utils.Metrics
58-
err := bs.pulsar.Client.Get(endpoint, &response)
79+
err := bs.pulsar.Client.GetWithContext(ctx, endpoint, &response)
5980
if err != nil {
6081
return nil, err
6182
}
@@ -64,9 +85,13 @@ func (bs *brokerStats) GetMetrics() ([]utils.Metrics, error) {
6485
}
6586

6687
func (bs *brokerStats) GetMBeans() ([]utils.Metrics, error) {
88+
return bs.GetMBeansWithContext(context.Background())
89+
}
90+
91+
func (bs *brokerStats) GetMBeansWithContext(ctx context.Context) ([]utils.Metrics, error) {
6792
endpoint := bs.pulsar.endpoint(bs.basePath, "/mbeans")
6893
var response []utils.Metrics
69-
err := bs.pulsar.Client.Get(endpoint, &response)
94+
err := bs.pulsar.Client.GetWithContext(ctx, endpoint, &response)
7095
if err != nil {
7196
return nil, err
7297
}
@@ -75,8 +100,12 @@ func (bs *brokerStats) GetMBeans() ([]utils.Metrics, error) {
75100
}
76101

77102
func (bs *brokerStats) GetTopics() (string, error) {
103+
return bs.GetTopicsWithContext(context.Background())
104+
}
105+
106+
func (bs *brokerStats) GetTopicsWithContext(ctx context.Context) (string, error) {
78107
endpoint := bs.pulsar.endpoint(bs.basePath, "/topics")
79-
buf, err := bs.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false)
108+
buf, err := bs.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, nil, nil, false)
80109
if err != nil {
81110
return "", err
82111
}
@@ -85,19 +114,30 @@ func (bs *brokerStats) GetTopics() (string, error) {
85114
}
86115

87116
func (bs *brokerStats) GetLoadReport() (*utils.LocalBrokerData, error) {
117+
return bs.GetLoadReportWithContext(context.Background())
118+
}
119+
120+
func (bs *brokerStats) GetLoadReportWithContext(ctx context.Context) (*utils.LocalBrokerData, error) {
88121
endpoint := bs.pulsar.endpoint(bs.basePath, "/load-report")
89122
response := utils.NewLocalBrokerData()
90-
err := bs.pulsar.Client.Get(endpoint, &response)
123+
err := bs.pulsar.Client.GetWithContext(ctx, endpoint, &response)
91124
if err != nil {
92125
return nil, nil
93126
}
94127
return &response, nil
95128
}
96129

97130
func (bs *brokerStats) GetAllocatorStats(allocatorName string) (*utils.AllocatorStats, error) {
131+
return bs.GetAllocatorStatsWithContext(context.Background(), allocatorName)
132+
}
133+
134+
func (bs *brokerStats) GetAllocatorStatsWithContext(
135+
ctx context.Context,
136+
allocatorName string,
137+
) (*utils.AllocatorStats, error) {
98138
endpoint := bs.pulsar.endpoint(bs.basePath, "/allocator-stats", allocatorName)
99139
var allocatorStats utils.AllocatorStats
100-
err := bs.pulsar.Client.Get(endpoint, &allocatorStats)
140+
err := bs.pulsar.Client.GetWithContext(ctx, endpoint, &allocatorStats)
101141
if err != nil {
102142
return nil, err
103143
}

pulsaradmin/pkg/admin/brokers.go

Lines changed: 114 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package admin
1919

2020
import (
21+
"context"
2122
"fmt"
2223
"strings"
2324

@@ -27,42 +28,85 @@ import (
2728
// Brokers is admin interface for brokers management
2829
type Brokers interface {
2930

30-
// GetListActiveBrokers Get the list of active brokers in the local cluster.
31+
// GetListActiveBrokers returns the list of active brokers in the local cluster.
3132
GetListActiveBrokers() ([]string, error)
33+
34+
// GetListActiveBrokersWithContext returns the list of active brokers in the local cluster.
35+
GetListActiveBrokersWithContext(context.Context) ([]string, error)
36+
3237
// GetActiveBrokers returns the list of active brokers in the cluster.
3338
GetActiveBrokers(cluster string) ([]string, error)
3439

40+
// GetActiveBrokersWithContext returns the list of active brokers in the cluster.
41+
GetActiveBrokersWithContext(ctx context.Context, cluster string) ([]string, error)
42+
3543
// GetDynamicConfigurationNames returns list of updatable configuration name
3644
GetDynamicConfigurationNames() ([]string, error)
3745

46+
// GetDynamicConfigurationNamesWithContext returns list of updatable configuration name
47+
GetDynamicConfigurationNamesWithContext(context.Context) ([]string, error)
48+
3849
// GetOwnedNamespaces returns the map of owned namespaces and their status from a single broker in the cluster
3950
GetOwnedNamespaces(cluster, brokerURL string) (map[string]utils.NamespaceOwnershipStatus, error)
4051

52+
// GetOwnedNamespaces returns the map of owned namespaces and their status from a single broker in the cluster
53+
GetOwnedNamespacesWithContext(
54+
ctx context.Context,
55+
cluster,
56+
brokerURL string,
57+
) (map[string]utils.NamespaceOwnershipStatus, error)
58+
4159
// UpdateDynamicConfiguration updates dynamic configuration value in to Zk that triggers watch on
4260
// brokers and all brokers can update {@link ServiceConfiguration} value locally
4361
UpdateDynamicConfiguration(configName, configValue string) error
4462

63+
// UpdateDynamicConfigurationWithContext updates dynamic configuration value in to Zk that triggers watch on
64+
// brokers and all brokers can update {@link ServiceConfiguration} value locally
65+
UpdateDynamicConfigurationWithContext(ctx context.Context, configName, configValue string) error
66+
4567
// DeleteDynamicConfiguration deletes dynamic configuration value in to Zk. It will not impact current value
4668
// in broker but next time when broker restarts, it applies value from configuration file only.
4769
DeleteDynamicConfiguration(configName string) error
4870

71+
// DeleteDynamicConfigurationWithContext deletes dynamic configuration value in to Zk. It will not impact current value
72+
// in broker but next time when broker restarts, it applies value from configuration file only.
73+
DeleteDynamicConfigurationWithContext(ctx context.Context, configName string) error
74+
4975
// GetRuntimeConfigurations returns values of runtime configuration
5076
GetRuntimeConfigurations() (map[string]string, error)
5177

78+
// GetRuntimeConfigurationsWithContext returns values of runtime configuration
79+
GetRuntimeConfigurationsWithContext(context.Context) (map[string]string, error)
80+
5281
// GetInternalConfigurationData returns the internal configuration data
5382
GetInternalConfigurationData() (*utils.InternalConfigurationData, error)
5483

84+
// GetInternalConfigurationDataWithContext returns the internal configuration data
85+
GetInternalConfigurationDataWithContext(context.Context) (*utils.InternalConfigurationData, error)
86+
5587
// GetAllDynamicConfigurations returns values of all overridden dynamic-configs
5688
GetAllDynamicConfigurations() (map[string]string, error)
5789

90+
// GetAllDynamicConfigurationsWithContext returns values of all overridden dynamic-configs
91+
GetAllDynamicConfigurationsWithContext(context.Context) (map[string]string, error)
92+
5893
// Deprecated: Use HealthCheckWithTopicVersion instead
5994
HealthCheck() error
6095

61-
// HealthCheckWithTopicVersion run a health check on the broker
96+
// Deprecated: Use HealthCheckWithTopicVersionWithContext instead
97+
HealthCheckWithContext(context.Context) error
98+
99+
// HealthCheckWithTopicVersion runs a health check on the broker
62100
HealthCheckWithTopicVersion(utils.TopicVersion) error
63101

102+
// HealthCheckWithTopicVersionWithContext runs a health check on the broker
103+
HealthCheckWithTopicVersionWithContext(context.Context, utils.TopicVersion) error
104+
64105
// GetLeaderBroker get the information of the leader broker.
65106
GetLeaderBroker() (utils.BrokerInfo, error)
107+
108+
// GetLeaderBrokerWithContext returns the information of the leader broker.
109+
GetLeaderBrokerWithContext(context.Context) (utils.BrokerInfo, error)
66110
}
67111

68112
type broker struct {
@@ -79,93 +123,144 @@ func (c *pulsarClient) Brokers() Brokers {
79123
}
80124

81125
func (b *broker) GetActiveBrokers(cluster string) ([]string, error) {
126+
return b.GetActiveBrokersWithContext(context.Background(), cluster)
127+
}
128+
129+
func (b *broker) GetActiveBrokersWithContext(ctx context.Context, cluster string) ([]string, error) {
82130
endpoint := b.pulsar.endpoint(b.basePath, cluster)
83131
var res []string
84-
err := b.pulsar.Client.Get(endpoint, &res)
132+
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
85133
if err != nil {
86134
return nil, err
87135
}
88136
return res, nil
89137
}
90138

91139
func (b *broker) GetListActiveBrokers() ([]string, error) {
140+
return b.GetListActiveBrokersWithContext(context.Background())
141+
}
142+
143+
func (b *broker) GetListActiveBrokersWithContext(ctx context.Context) ([]string, error) {
92144
endpoint := b.pulsar.endpoint(b.basePath)
93145
var res []string
94-
err := b.pulsar.Client.Get(endpoint, &res)
146+
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
95147
if err != nil {
96148
return nil, err
97149
}
98150
return res, nil
99151
}
100152

101153
func (b *broker) GetDynamicConfigurationNames() ([]string, error) {
154+
return b.GetDynamicConfigurationNamesWithContext(context.Background())
155+
}
156+
157+
func (b *broker) GetDynamicConfigurationNamesWithContext(ctx context.Context) ([]string, error) {
102158
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/")
103159
var res []string
104-
err := b.pulsar.Client.Get(endpoint, &res)
160+
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
105161
if err != nil {
106162
return nil, err
107163
}
108164
return res, nil
109165
}
110166

111167
func (b *broker) GetOwnedNamespaces(cluster, brokerURL string) (map[string]utils.NamespaceOwnershipStatus, error) {
168+
return b.GetOwnedNamespacesWithContext(context.Background(), cluster, brokerURL)
169+
}
170+
171+
func (b *broker) GetOwnedNamespacesWithContext(
172+
ctx context.Context,
173+
cluster,
174+
brokerURL string,
175+
) (map[string]utils.NamespaceOwnershipStatus, error) {
112176
endpoint := b.pulsar.endpoint(b.basePath, cluster, brokerURL, "ownedNamespaces")
113177
var res map[string]utils.NamespaceOwnershipStatus
114-
err := b.pulsar.Client.Get(endpoint, &res)
178+
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
115179
if err != nil {
116180
return nil, err
117181
}
118182
return res, nil
119183
}
120184

121185
func (b *broker) UpdateDynamicConfiguration(configName, configValue string) error {
186+
return b.UpdateDynamicConfigurationWithContext(context.Background(), configName, configValue)
187+
}
188+
189+
func (b *broker) UpdateDynamicConfigurationWithContext(ctx context.Context, configName, configValue string) error {
122190
value := fmt.Sprintf("/configuration/%s/%s", configName, configValue)
123191
endpoint := b.pulsar.endpointWithFullPath(b.basePath, value)
124-
return b.pulsar.Client.Post(endpoint, nil)
192+
return b.pulsar.Client.PostWithContext(ctx, endpoint, nil)
125193
}
126194

127195
func (b *broker) DeleteDynamicConfiguration(configName string) error {
196+
return b.DeleteDynamicConfigurationWithContext(context.Background(), configName)
197+
}
198+
199+
func (b *broker) DeleteDynamicConfigurationWithContext(ctx context.Context, configName string) error {
128200
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", configName)
129-
return b.pulsar.Client.Delete(endpoint)
201+
return b.pulsar.Client.DeleteWithContext(ctx, endpoint)
130202
}
131203

132204
func (b *broker) GetRuntimeConfigurations() (map[string]string, error) {
205+
return b.GetRuntimeConfigurationsWithContext(context.Background())
206+
}
207+
208+
func (b *broker) GetRuntimeConfigurationsWithContext(ctx context.Context) (map[string]string, error) {
133209
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", "runtime")
134210
var res map[string]string
135-
err := b.pulsar.Client.Get(endpoint, &res)
211+
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
136212
if err != nil {
137213
return nil, err
138214
}
139215
return res, nil
140216
}
141217

142218
func (b *broker) GetInternalConfigurationData() (*utils.InternalConfigurationData, error) {
219+
return b.GetInternalConfigurationDataWithContext(context.Background())
220+
}
221+
222+
func (b *broker) GetInternalConfigurationDataWithContext(
223+
ctx context.Context,
224+
) (*utils.InternalConfigurationData, error) {
143225
endpoint := b.pulsar.endpoint(b.basePath, "/internal-configuration")
144226
var res utils.InternalConfigurationData
145-
err := b.pulsar.Client.Get(endpoint, &res)
227+
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
146228
if err != nil {
147229
return nil, err
148230
}
149231
return &res, nil
150232
}
151233

152234
func (b *broker) GetAllDynamicConfigurations() (map[string]string, error) {
235+
return b.GetAllDynamicConfigurationsWithContext(context.Background())
236+
}
237+
238+
func (b *broker) GetAllDynamicConfigurationsWithContext(ctx context.Context) (map[string]string, error) {
153239
endpoint := b.pulsar.endpoint(b.basePath, "/configuration/", "values")
154240
var res map[string]string
155-
err := b.pulsar.Client.Get(endpoint, &res)
241+
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &res)
156242
if err != nil {
157243
return nil, err
158244
}
159245
return res, nil
160246
}
161247

162248
func (b *broker) HealthCheck() error {
163-
return b.HealthCheckWithTopicVersion(utils.TopicVersionV1)
249+
return b.HealthCheckWithContext(context.Background())
164250
}
251+
252+
func (b *broker) HealthCheckWithContext(ctx context.Context) error {
253+
return b.HealthCheckWithTopicVersionWithContext(ctx, utils.TopicVersionV1)
254+
}
255+
165256
func (b *broker) HealthCheckWithTopicVersion(topicVersion utils.TopicVersion) error {
257+
return b.HealthCheckWithTopicVersionWithContext(context.Background(), topicVersion)
258+
}
259+
260+
func (b *broker) HealthCheckWithTopicVersionWithContext(ctx context.Context, topicVersion utils.TopicVersion) error {
166261
endpoint := b.pulsar.endpoint(b.basePath, "/health")
167262

168-
buf, err := b.pulsar.Client.GetWithQueryParams(endpoint, nil, map[string]string{
263+
buf, err := b.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, nil, map[string]string{
169264
"topicVersion": topicVersion.String(),
170265
}, false)
171266
if err != nil {
@@ -177,10 +272,15 @@ func (b *broker) HealthCheckWithTopicVersion(topicVersion utils.TopicVersion) er
177272
}
178273
return nil
179274
}
275+
180276
func (b *broker) GetLeaderBroker() (utils.BrokerInfo, error) {
277+
return b.GetLeaderBrokerWithContext(context.Background())
278+
}
279+
280+
func (b *broker) GetLeaderBrokerWithContext(ctx context.Context) (utils.BrokerInfo, error) {
181281
endpoint := b.pulsar.endpoint(b.basePath, "/leaderBroker")
182282
var brokerInfo utils.BrokerInfo
183-
err := b.pulsar.Client.Get(endpoint, &brokerInfo)
283+
err := b.pulsar.Client.GetWithContext(ctx, endpoint, &brokerInfo)
184284
if err != nil {
185285
return brokerInfo, err
186286
}

0 commit comments

Comments
 (0)