Skip to content

Commit 5745678

Browse files
authored
Merge pull request #1247 from LiZhenCheng9527/refactor-dns
Refactor dns
2 parents 57c2177 + 2f054db commit 5745678

File tree

8 files changed

+924
-659
lines changed

8 files changed

+924
-659
lines changed

pkg/controller/ads/ads_controller.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ var (
3333
)
3434

3535
type Controller struct {
36-
Processor *processor
37-
con *connection
36+
Processor *processor
37+
dnsResolverController *dnsController
38+
con *connection
3839
}
3940

4041
type connection struct {
@@ -44,8 +45,18 @@ type connection struct {
4445
}
4546

4647
func NewController(bpfAds *bpfads.BpfAds) *Controller {
48+
processor := newProcessor(bpfAds)
49+
// create kernel-native mode ads resolver controller
50+
dnsResolverController, err := NewDnsController(processor.Cache)
51+
if err != nil {
52+
log.Errorf("dns resolver of Kernel-Native mode create failed: %v", err)
53+
return nil
54+
}
55+
processor.DnsResolverChan = dnsResolverController.clustersChan
56+
4757
return &Controller{
48-
Processor: newProcessor(bpfAds),
58+
dnsResolverController: dnsResolverController,
59+
Processor: processor,
4960
}
5061
}
5162

@@ -84,6 +95,9 @@ func (c *Controller) HandleAdsStream() error {
8495
return fmt.Errorf("stream recv failed, %s", err)
8596
}
8697

98+
// Because Kernel-Native mode is full update.
99+
// So the original clusterCache is deleted when a new resp is received.
100+
c.dnsResolverController.newClusterCache()
87101
c.Processor.processAdsResponse(rsp)
88102
c.con.requestsChan.Put(c.Processor.ack)
89103
if c.Processor.req != nil {
@@ -115,3 +129,9 @@ func (c *Controller) Close() {
115129
_ = c.con.Stream.CloseSend()
116130
}
117131
}
132+
133+
func (c *Controller) StartDnsController(stopCh <-chan struct{}) {
134+
if c.dnsResolverController != nil {
135+
c.dnsResolverController.Run(stopCh)
136+
}
137+
}

pkg/controller/ads/ads_controller_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,8 @@ func TestHandleAdsStream(t *testing.T) {
119119

120120
adsStream := NewController(nil)
121121
adsStream.con = &connection{Stream: fakeClient.AdsClient, requestsChan: channels.NewUnbounded[*service_discovery_v3.DiscoveryRequest](), stopCh: make(chan struct{})}
122-
122+
adsStream.dnsResolverController.Run(make(chan struct{}))
123123
patches1 := gomonkey.NewPatches()
124-
patches2 := gomonkey.NewPatches()
125124
tests := []struct {
126125
name string
127126
beforeFunc func()
@@ -161,7 +160,6 @@ func TestHandleAdsStream(t *testing.T) {
161160
},
162161
afterFunc: func() {
163162
patches1.Reset()
164-
patches2.Reset()
165163
},
166164
wantErr: false,
167165
},

pkg/controller/ads/dns.go

Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
/*
2+
* Copyright The Kmesh Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package ads
18+
19+
import (
20+
"fmt"
21+
"net"
22+
"net/netip"
23+
"slices"
24+
"sync"
25+
"time"
26+
27+
clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
28+
v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
29+
endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
30+
"google.golang.org/protobuf/proto"
31+
"google.golang.org/protobuf/types/known/wrapperspb"
32+
33+
core_v2 "kmesh.net/kmesh/api/v2/core"
34+
"kmesh.net/kmesh/pkg/dns"
35+
)
36+
37+
// adsDnsResolver is DNS resolver of Kernel Native
38+
type dnsController struct {
39+
clustersChan chan []*clusterv3.Cluster
40+
cache *AdsCache
41+
dnsResolver *dns.DNSResolver
42+
// Store the copy of pendingResolveDomain.
43+
clusterCache map[string]*pendingResolveDomain
44+
// store all pending hostnames in the clusters
45+
pendingHostnames map[string][]string
46+
sync.RWMutex
47+
}
48+
49+
// pending resolve domain info of Kennel-Native Mode,
50+
// cluster is used for create the apicluster
51+
type pendingResolveDomain struct {
52+
Clusters []*clusterv3.Cluster
53+
RefreshRate time.Duration
54+
}
55+
56+
func NewDnsController(adsCache *AdsCache) (*dnsController, error) {
57+
resolver, err := dns.NewDNSResolver()
58+
if err != nil {
59+
return nil, err
60+
}
61+
return &dnsController{
62+
clustersChan: make(chan []*clusterv3.Cluster),
63+
cache: adsCache,
64+
dnsResolver: resolver,
65+
clusterCache: make(map[string]*pendingResolveDomain),
66+
pendingHostnames: make(map[string][]string),
67+
}, nil
68+
}
69+
70+
func (r *dnsController) Run(stopCh <-chan struct{}) {
71+
// Start dns resolver
72+
go r.dnsResolver.StartDnsResolver(stopCh)
73+
// Handle cds updates
74+
go r.refreshWorker(stopCh)
75+
// Consumption of clusters.
76+
go r.processClusters()
77+
go func() {
78+
<-stopCh
79+
close(r.clustersChan)
80+
}()
81+
}
82+
83+
func (r *dnsController) processClusters() {
84+
for clusters := range r.clustersChan {
85+
r.processDomains(clusters)
86+
}
87+
}
88+
89+
func (r *dnsController) processDomains(cds []*clusterv3.Cluster) {
90+
domains := getPendingResolveDomain(cds)
91+
92+
// store all pending hostnames of clusters in pendingHostnames
93+
for _, cluster := range cds {
94+
clusterName := cluster.GetName()
95+
info := getHostName(cluster)
96+
r.pendingHostnames[clusterName] = info
97+
}
98+
99+
// delete any scheduled re-resolve for domains we no longer care about
100+
r.dnsResolver.RemoveUnwatchDomain(domains)
101+
102+
// Update clusters based on the data in the dns cache.
103+
for k, v := range domains {
104+
addresses := r.dnsResolver.GetDNSAddresses(k)
105+
// Already have record in dns cache
106+
if addresses != nil {
107+
// Use a goroutine to update the Cluster, reducing the processing time of functions
108+
// Avoiding clusterChan blocking
109+
go r.updateClusters(v.(*pendingResolveDomain), k, addresses)
110+
} else {
111+
// Initialize the newly added hostname
112+
// and add it to the dns queue to be resolved.
113+
domainInfo := &dns.DomainInfo{
114+
Domain: k,
115+
RefreshRate: v.(*pendingResolveDomain).RefreshRate,
116+
}
117+
r.dnsResolver.AddDomainInQueue(domainInfo, 0)
118+
}
119+
}
120+
}
121+
122+
// Handle cds updates
123+
func (r *dnsController) refreshWorker(stop <-chan struct{}) {
124+
for {
125+
select {
126+
case <-stop:
127+
return
128+
case domain := <-r.dnsResolver.DnsChan:
129+
pendingDomain := r.getClustersByDomain(domain)
130+
addrs := r.dnsResolver.GetDNSAddresses(domain)
131+
r.updateClusters(pendingDomain, domain, addrs)
132+
}
133+
}
134+
}
135+
136+
func (r *dnsController) updateClusters(pendingDomain *pendingResolveDomain, domain string, addrs []string) {
137+
isClusterUpdate := false
138+
if pendingDomain == nil || addrs == nil {
139+
return
140+
}
141+
for _, cluster := range pendingDomain.Clusters {
142+
ready, newCluster := r.overwriteDnsCluster(cluster, domain, addrs)
143+
if ready {
144+
if !r.cache.UpdateApiClusterIfExists(core_v2.ApiStatus_UPDATE, newCluster) {
145+
log.Debugf("cluster: %s is deleted", cluster.Name)
146+
} else {
147+
isClusterUpdate = true
148+
}
149+
}
150+
}
151+
// if one cluster update successful, we will retuen true
152+
if isClusterUpdate {
153+
r.cache.ClusterCache.Flush()
154+
}
155+
}
156+
157+
func (r *dnsController) overwriteDnsCluster(cluster *clusterv3.Cluster, domain string, addrs []string) (bool, *clusterv3.Cluster) {
158+
ready := true
159+
hostNames := r.pendingHostnames[cluster.GetName()]
160+
addressesOfHostname := make(map[string][]string)
161+
162+
for _, hostName := range hostNames {
163+
addresses := r.dnsResolver.GetDNSAddresses(hostName)
164+
// There are hostnames in this Cluster that are not resolved.
165+
if addresses != nil {
166+
addressesOfHostname[hostName] = addresses
167+
} else {
168+
ready = false
169+
}
170+
}
171+
172+
if ready {
173+
newCluster := cloneCluster(cluster)
174+
for _, e := range newCluster.LoadAssignment.Endpoints {
175+
pos := -1
176+
var lbEndpoints []*endpointv3.LbEndpoint
177+
for i, le := range e.LbEndpoints {
178+
socketAddr, ok := le.GetEndpoint().GetAddress().GetAddress().(*v3.Address_SocketAddress)
179+
if !ok {
180+
continue
181+
}
182+
_, err := netip.ParseAddr(socketAddr.SocketAddress.Address)
183+
if err != nil {
184+
host := socketAddr.SocketAddress.Address
185+
addresses := addressesOfHostname[host]
186+
fmt.Printf("addresses %#v", addresses)
187+
pos = i
188+
lbEndpoints = buildLbEndpoints(socketAddr.SocketAddress.GetPortValue(), addresses)
189+
}
190+
}
191+
e.LbEndpoints = slices.Replace(e.LbEndpoints, pos, pos+1, lbEndpoints...)
192+
}
193+
return ready, newCluster
194+
}
195+
196+
return ready, nil
197+
}
198+
199+
func buildLbEndpoints(port uint32, addrs []string) []*endpointv3.LbEndpoint {
200+
lbEndpoints := make([]*endpointv3.LbEndpoint, 0, len(addrs))
201+
for _, addr := range addrs {
202+
ip := net.ParseIP(addr)
203+
if ip == nil {
204+
continue
205+
}
206+
if ip.To4() == nil {
207+
continue
208+
}
209+
lbEndpoint := &endpointv3.LbEndpoint{
210+
HealthStatus: v3.HealthStatus_HEALTHY,
211+
HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
212+
Endpoint: &endpointv3.Endpoint{
213+
Address: &v3.Address{
214+
Address: &v3.Address_SocketAddress{
215+
SocketAddress: &v3.SocketAddress{
216+
Address: addr,
217+
PortSpecifier: &v3.SocketAddress_PortValue{
218+
PortValue: port,
219+
},
220+
},
221+
},
222+
},
223+
},
224+
},
225+
// TODO: support LoadBalancingWeight
226+
LoadBalancingWeight: &wrapperspb.UInt32Value{
227+
Value: 1,
228+
},
229+
}
230+
lbEndpoints = append(lbEndpoints, lbEndpoint)
231+
}
232+
return lbEndpoints
233+
}
234+
235+
// Get the hostname to be resolved in Cluster
236+
func getHostName(cluster *clusterv3.Cluster) []string {
237+
info := []string{}
238+
for _, e := range cluster.LoadAssignment.Endpoints {
239+
for _, le := range e.LbEndpoints {
240+
socketAddr, ok := le.GetEndpoint().GetAddress().GetAddress().(*v3.Address_SocketAddress)
241+
if !ok {
242+
continue
243+
}
244+
_, err := netip.ParseAddr(socketAddr.SocketAddress.Address)
245+
if err != nil {
246+
info = append(info, socketAddr.SocketAddress.Address)
247+
}
248+
}
249+
}
250+
251+
return info
252+
}
253+
254+
func getPendingResolveDomain(cds []*clusterv3.Cluster) map[string]interface{} {
255+
domains := make(map[string]interface{})
256+
// hostNames := make(map[string]struct{})
257+
258+
for _, cluster := range cds {
259+
if cluster.LoadAssignment == nil {
260+
continue
261+
}
262+
263+
for _, e := range cluster.LoadAssignment.Endpoints {
264+
for _, le := range e.LbEndpoints {
265+
socketAddr, ok := le.GetEndpoint().GetAddress().GetAddress().(*v3.Address_SocketAddress)
266+
if !ok {
267+
continue
268+
}
269+
address := socketAddr.SocketAddress.Address
270+
if _, err := netip.ParseAddr(address); err == nil {
271+
// This is an ip address
272+
continue
273+
}
274+
275+
if v, ok := domains[address]; ok {
276+
v.(*pendingResolveDomain).Clusters = append(v.(*pendingResolveDomain).Clusters, cluster)
277+
} else {
278+
domainWithRefreshRate := &pendingResolveDomain{
279+
Clusters: []*clusterv3.Cluster{cluster},
280+
RefreshRate: cluster.GetDnsRefreshRate().AsDuration(),
281+
}
282+
domains[address] = domainWithRefreshRate
283+
}
284+
}
285+
}
286+
}
287+
288+
return domains
289+
}
290+
291+
func (r *dnsController) newClusterCache() {
292+
r.Lock()
293+
defer r.Unlock()
294+
295+
if r.clusterCache != nil {
296+
log.Debug("clean up dns clusters")
297+
r.clusterCache = map[string]*pendingResolveDomain{}
298+
return
299+
}
300+
}
301+
302+
func (r *dnsController) getClustersByDomain(domain string) *pendingResolveDomain {
303+
r.RLock()
304+
defer r.RUnlock()
305+
306+
if r.clusterCache != nil {
307+
if v, ok := r.clusterCache[domain]; ok {
308+
return v
309+
}
310+
}
311+
return nil
312+
}
313+
314+
func cloneCluster(cluster *clusterv3.Cluster) *clusterv3.Cluster {
315+
if cluster == nil {
316+
return nil
317+
}
318+
clusterCopy := proto.Clone(cluster).(*clusterv3.Cluster)
319+
return clusterCopy
320+
}

0 commit comments

Comments
 (0)