Skip to content
This repository was archived by the owner on Jan 21, 2022. It is now read-only.

Commit b14185d

Browse files
committed
added statistics
1 parent 39966a7 commit b14185d

File tree

4 files changed

+265
-23
lines changed

4 files changed

+265
-23
lines changed

cmd/magneticod/dht/mainline/indexingService.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import (
1010
"go.uber.org/zap"
1111
)
1212

13+
var(
14+
StatsPrintClock = 10*time.Second
15+
)
16+
1317
type IndexingService struct {
1418
// Private
1519
protocol *Protocol

cmd/magneticod/dht/mainline/protocol.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ package mainline
33
import (
44
"crypto/rand"
55
"crypto/sha1"
6+
"github.com/boramalper/magnetico/pkg/util"
67
"net"
8+
"sort"
9+
"strconv"
710
"sync"
811
"time"
912

@@ -16,6 +19,8 @@ type Protocol struct {
1619
transport *Transport
1720
eventHandlers ProtocolEventHandlers
1821
started bool
22+
23+
stats protocolStats
1924
}
2025

2126
type ProtocolEventHandlers struct {
@@ -38,6 +43,9 @@ func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol
3843
p = new(Protocol)
3944
p.eventHandlers = eventHandlers
4045
p.transport = NewTransport(laddr, p.onMessage, p.eventHandlers.OnCongestion)
46+
p.stats = protocolStats{
47+
messageTypeCount:make(map[string]map[string]int),
48+
}
4149

4250
p.currentTokenSecret, p.previousTokenSecret = make([]byte, 20), make([]byte, 20)
4351
_, err := rand.Read(p.currentTokenSecret)
@@ -56,6 +64,7 @@ func (p *Protocol) Start() {
5664
p.started = true
5765

5866
p.transport.Start()
67+
go p.printStats()
5968
go p.updateTokenSecret()
6069
}
6170

@@ -67,7 +76,110 @@ func (p *Protocol) Terminate() {
6776
p.transport.Terminate()
6877
}
6978

79+
//statistics
80+
type protocolStats struct{
81+
sync.RWMutex
82+
messageTypeCount map[string]map[string]int //type=>subtype=>count
83+
}
84+
func (ps *protocolStats) Reset(){
85+
ps.Lock()
86+
defer ps.Unlock()
87+
ps.messageTypeCount = make(map[string]map[string]int)
88+
}
89+
type messageTypeCountOrdered struct{
90+
messageType string
91+
messageCount int
92+
percentageOverTotal float64
93+
subMessages orderedMessagesCount
94+
}
95+
type orderedMessagesCount []*messageTypeCountOrdered
96+
func (omc orderedMessagesCount) Len() int {
97+
return len(omc)
98+
}
99+
func (omc orderedMessagesCount) Swap(i, j int) {
100+
omc[i], omc[j] = omc[j], omc[i]
101+
}
102+
func (omc orderedMessagesCount) Less(i, j int) bool {
103+
return omc[i].messageCount > omc[j].messageCount
104+
}
105+
func (omc orderedMessagesCount)CalculatePercentagesOverTotal(totalMessages int){
106+
for _,mtco := range omc{
107+
if mtco.subMessages != nil && len(mtco.subMessages) > 0{
108+
mtco.subMessages.CalculatePercentagesOverTotal(totalMessages)
109+
}
110+
mtco.percentageOverTotal = util.RoundToDecimal(
111+
(float64(mtco.messageCount) / float64(totalMessages)) * 100,2)
112+
}
113+
}
114+
func (omc orderedMessagesCount) Sort(){
115+
for _,mtco := range omc{
116+
if mtco.subMessages != nil && len(mtco.subMessages) > 0{
117+
mtco.subMessages.Sort()
118+
}
119+
}
120+
sort.Sort(omc)
121+
}
122+
func (omc orderedMessagesCount) String() string {
123+
/*
124+
string concatenation is slow, so a bytes.Buffer would be better. But, this is called once every few seconds, so this won't
125+
be a problem and it will be much easier to write down and read
126+
*/
127+
mostReceivedMessageTypes := ""
128+
for mIdx, m := range omc {
129+
if mIdx > 0 {
130+
mostReceivedMessageTypes += ", "
131+
}
132+
mostReceivedMessageTypes += m.messageType
133+
mostReceivedMessageTypes +=
134+
" (" + strconv.Itoa(m.messageCount) + ", " + strconv.FormatFloat(m.percentageOverTotal, 'f', -1, 64) + "%)"
135+
136+
if m.subMessages != nil && len(m.subMessages) > 0 {
137+
//add stats for submessages unless there is only 1 submessage with len 0 (empty)
138+
if ! (len(m.subMessages) == 1 && len(m.subMessages[0].messageType) == 0) {
139+
mostReceivedMessageTypes += "[ " + m.subMessages.String() + " ]"
140+
}
141+
}
142+
}
143+
return mostReceivedMessageTypes
144+
}
145+
func (p *Protocol) printStats() {
146+
for {
147+
time.Sleep(StatsPrintClock)
148+
p.stats.RLock()
149+
orderedMessages := make(orderedMessagesCount, 0, len(p.stats.messageTypeCount))
150+
totalMessages := 0
151+
for mType, mSubTypes := range p.stats.messageTypeCount {
152+
mCount := 0
153+
orderedSubMessages := make(orderedMessagesCount, 0, len(mSubTypes))
154+
for mSubType, mSubCount := range mSubTypes {
155+
mCount += mSubCount
156+
totalMessages += mSubCount
157+
orderedSubMessages = append(orderedSubMessages, &messageTypeCountOrdered{
158+
messageType: mSubType,
159+
messageCount: mSubCount,
160+
})
161+
}
162+
orderedMessages = append(orderedMessages, &messageTypeCountOrdered{
163+
messageType: mType,
164+
messageCount: mCount,
165+
subMessages: orderedSubMessages,
166+
})
167+
}
168+
p.stats.RUnlock()
169+
orderedMessages.CalculatePercentagesOverTotal(totalMessages)
170+
orderedMessages.Sort()
171+
172+
zap.L().Info("Protocol stats (on "+StatsPrintClock.String()+"):",
173+
zap.String("message type", orderedMessages.String()),
174+
)
175+
176+
p.stats.Reset()
177+
}
178+
}
179+
70180
func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
181+
temporaryQ := msg.Q
182+
71183
switch msg.Y {
72184
case "q":
73185
switch msg.Q {
@@ -140,6 +252,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
140252
//
141253
// sample_infohashes > get_peers > find_node > ping / announce_peer
142254
if len(msg.R.Samples) != 0 { // The message should be a sample_infohashes response.
255+
temporaryQ = "sample_infohashes"
143256
if !validateSampleInfohashesResponseMessage(msg) {
144257
// zap.L().Debug("An invalid sample_infohashes response received!")
145258
return
@@ -148,6 +261,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
148261
p.eventHandlers.OnSampleInfohashesResponse(msg, addr)
149262
}
150263
} else if len(msg.R.Token) != 0 { // The message should be a get_peers response.
264+
temporaryQ = "get_peers"
151265
if !validateGetPeersResponseMessage(msg) {
152266
// zap.L().Debug("An invalid get_peers response received!")
153267
return
@@ -156,6 +270,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
156270
p.eventHandlers.OnGetPeersResponse(msg, addr)
157271
}
158272
} else if len(msg.R.Nodes) != 0 { // The message should be a find_node response.
273+
temporaryQ = "find_node"
159274
if !validateFindNodeResponseMessage(msg) {
160275
// zap.L().Debug("An invalid find_node response received!")
161276
return
@@ -164,6 +279,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
164279
p.eventHandlers.OnFindNodeResponse(msg, addr)
165280
}
166281
} else { // The message should be a ping or an announce_peer response.
282+
temporaryQ = "ping_or_announce"
167283
if !validatePingORannouncePeerResponseMessage(msg) {
168284
// zap.L().Debug("An invalid ping OR announce_peer response received!")
169285
return
@@ -182,6 +298,14 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
182298
zap.String("type", msg.Y))
183299
*/
184300
}
301+
302+
//let's update stats at the end so that in case of an "r" message the previous switch case can update the temporaryQ field
303+
p.stats.Lock()
304+
if _, ok := p.stats.messageTypeCount[msg.Y] ; !ok{
305+
p.stats.messageTypeCount[msg.Y] = make(map[string]int)
306+
}
307+
p.stats.messageTypeCount[msg.Y][temporaryQ] ++
308+
p.stats.Unlock()
185309
}
186310

187311
func (p *Protocol) SendMessage(msg *Message, addr *net.UDPAddr) {

0 commit comments

Comments
 (0)