@@ -3,7 +3,10 @@ package mainline
3
3
import (
4
4
"crypto/rand"
5
5
"crypto/sha1"
6
+ "github.com/boramalper/magnetico/pkg/util"
6
7
"net"
8
+ "sort"
9
+ "strconv"
7
10
"sync"
8
11
"time"
9
12
@@ -16,6 +19,8 @@ type Protocol struct {
16
19
transport * Transport
17
20
eventHandlers ProtocolEventHandlers
18
21
started bool
22
+
23
+ stats protocolStats
19
24
}
20
25
21
26
type ProtocolEventHandlers struct {
@@ -38,6 +43,9 @@ func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol
38
43
p = new (Protocol )
39
44
p .eventHandlers = eventHandlers
40
45
p .transport = NewTransport (laddr , p .onMessage , p .eventHandlers .OnCongestion )
46
+ p .stats = protocolStats {
47
+ messageTypeCount : make (map [string ]map [string ]int ),
48
+ }
41
49
42
50
p .currentTokenSecret , p .previousTokenSecret = make ([]byte , 20 ), make ([]byte , 20 )
43
51
_ , err := rand .Read (p .currentTokenSecret )
@@ -56,6 +64,7 @@ func (p *Protocol) Start() {
56
64
p .started = true
57
65
58
66
p .transport .Start ()
67
+ go p .printStats ()
59
68
go p .updateTokenSecret ()
60
69
}
61
70
@@ -67,7 +76,113 @@ func (p *Protocol) Terminate() {
67
76
p .transport .Terminate ()
68
77
}
69
78
79
+ //statistics
80
+ type protocolStats struct {
81
+ sync.RWMutex
82
+ messageTypeCount map [string ]map [string ]int //type=>subtype=>count
83
+ }
84
+
85
+ func (ps * protocolStats ) Reset () {
86
+ ps .Lock ()
87
+ defer ps .Unlock ()
88
+ ps .messageTypeCount = make (map [string ]map [string ]int )
89
+ }
90
+
91
+ type messageTypeCountOrdered struct {
92
+ messageType string
93
+ messageCount int
94
+ percentageOverTotal float64
95
+ subMessages orderedMessagesCount
96
+ }
97
+ type orderedMessagesCount []* messageTypeCountOrdered
98
+
99
+ func (omc orderedMessagesCount ) Len () int {
100
+ return len (omc )
101
+ }
102
+ func (omc orderedMessagesCount ) Swap (i , j int ) {
103
+ omc [i ], omc [j ] = omc [j ], omc [i ]
104
+ }
105
+ func (omc orderedMessagesCount ) Less (i , j int ) bool {
106
+ return omc [i ].messageCount > omc [j ].messageCount
107
+ }
108
+ func (omc orderedMessagesCount ) CalculatePercentagesOverTotal (totalMessages int ) {
109
+ for _ , mtco := range omc {
110
+ if mtco .subMessages != nil && len (mtco .subMessages ) > 0 {
111
+ mtco .subMessages .CalculatePercentagesOverTotal (totalMessages )
112
+ }
113
+ mtco .percentageOverTotal = util .RoundToDecimal (
114
+ (float64 (mtco .messageCount )/ float64 (totalMessages ))* 100 , 2 )
115
+ }
116
+ }
117
+ func (omc orderedMessagesCount ) Sort () {
118
+ for _ , mtco := range omc {
119
+ if mtco .subMessages != nil && len (mtco .subMessages ) > 0 {
120
+ mtco .subMessages .Sort ()
121
+ }
122
+ }
123
+ sort .Sort (omc )
124
+ }
125
+ func (omc orderedMessagesCount ) String () string {
126
+ /*
127
+ string concatenation is slow, so a bytes.Buffer would be better. But, this is called once every few seconds, so this won't
128
+ be a problem and it will be much easier to write down and read
129
+ */
130
+ mostReceivedMessageTypes := ""
131
+ for mIdx , m := range omc {
132
+ if mIdx > 0 {
133
+ mostReceivedMessageTypes += ", "
134
+ }
135
+ mostReceivedMessageTypes += m .messageType
136
+ mostReceivedMessageTypes +=
137
+ " (" + strconv .Itoa (m .messageCount ) + ", " + strconv .FormatFloat (m .percentageOverTotal , 'f' , - 1 , 64 ) + "%)"
138
+
139
+ if m .subMessages != nil && len (m .subMessages ) > 0 {
140
+ //add stats for submessages unless there is only 1 submessage with len 0 (empty)
141
+ if ! (len (m .subMessages ) == 1 && len (m .subMessages [0 ].messageType ) == 0 ) {
142
+ mostReceivedMessageTypes += "[ " + m .subMessages .String () + " ]"
143
+ }
144
+ }
145
+ }
146
+ return mostReceivedMessageTypes
147
+ }
148
+ func (p * Protocol ) printStats () {
149
+ for {
150
+ time .Sleep (StatsPrintClock )
151
+ p .stats .RLock ()
152
+ orderedMessages := make (orderedMessagesCount , 0 , len (p .stats .messageTypeCount ))
153
+ totalMessages := 0
154
+ for mType , mSubTypes := range p .stats .messageTypeCount {
155
+ mCount := 0
156
+ orderedSubMessages := make (orderedMessagesCount , 0 , len (mSubTypes ))
157
+ for mSubType , mSubCount := range mSubTypes {
158
+ mCount += mSubCount
159
+ totalMessages += mSubCount
160
+ orderedSubMessages = append (orderedSubMessages , & messageTypeCountOrdered {
161
+ messageType : mSubType ,
162
+ messageCount : mSubCount ,
163
+ })
164
+ }
165
+ orderedMessages = append (orderedMessages , & messageTypeCountOrdered {
166
+ messageType : mType ,
167
+ messageCount : mCount ,
168
+ subMessages : orderedSubMessages ,
169
+ })
170
+ }
171
+ p .stats .RUnlock ()
172
+ orderedMessages .CalculatePercentagesOverTotal (totalMessages )
173
+ orderedMessages .Sort ()
174
+
175
+ zap .L ().Info ("Protocol stats (on " + StatsPrintClock .String ()+ "):" ,
176
+ zap .String ("message type" , orderedMessages .String ()),
177
+ )
178
+
179
+ p .stats .Reset ()
180
+ }
181
+ }
182
+
70
183
func (p * Protocol ) onMessage (msg * Message , addr * net.UDPAddr ) {
184
+ temporaryQ := msg .Q
185
+
71
186
switch msg .Y {
72
187
case "q" :
73
188
switch msg .Q {
@@ -140,6 +255,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
140
255
//
141
256
// sample_infohashes > get_peers > find_node > ping / announce_peer
142
257
if len (msg .R .Samples ) != 0 { // The message should be a sample_infohashes response.
258
+ temporaryQ = "sample_infohashes"
143
259
if ! validateSampleInfohashesResponseMessage (msg ) {
144
260
// zap.L().Debug("An invalid sample_infohashes response received!")
145
261
return
@@ -148,6 +264,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
148
264
p .eventHandlers .OnSampleInfohashesResponse (msg , addr )
149
265
}
150
266
} else if len (msg .R .Token ) != 0 { // The message should be a get_peers response.
267
+ temporaryQ = "get_peers"
151
268
if ! validateGetPeersResponseMessage (msg ) {
152
269
// zap.L().Debug("An invalid get_peers response received!")
153
270
return
@@ -156,6 +273,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
156
273
p .eventHandlers .OnGetPeersResponse (msg , addr )
157
274
}
158
275
} else if len (msg .R .Nodes ) != 0 { // The message should be a find_node response.
276
+ temporaryQ = "find_node"
159
277
if ! validateFindNodeResponseMessage (msg ) {
160
278
// zap.L().Debug("An invalid find_node response received!")
161
279
return
@@ -164,6 +282,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
164
282
p .eventHandlers .OnFindNodeResponse (msg , addr )
165
283
}
166
284
} else { // The message should be a ping or an announce_peer response.
285
+ temporaryQ = "ping_or_announce"
167
286
if ! validatePingORannouncePeerResponseMessage (msg ) {
168
287
// zap.L().Debug("An invalid ping OR announce_peer response received!")
169
288
return
@@ -182,6 +301,14 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
182
301
zap.String("type", msg.Y))
183
302
*/
184
303
}
304
+
305
+ //let's update stats at the end so that in case of an "r" message the previous switch case can update the temporaryQ field
306
+ p .stats .Lock ()
307
+ if _ , ok := p .stats .messageTypeCount [msg .Y ]; ! ok {
308
+ p .stats .messageTypeCount [msg .Y ] = make (map [string ]int )
309
+ }
310
+ p .stats .messageTypeCount [msg .Y ][temporaryQ ] ++
311
+ p .stats .Unlock ()
185
312
}
186
313
187
314
func (p * Protocol ) SendMessage (msg * Message , addr * net.UDPAddr ) {
@@ -344,7 +471,7 @@ func validateGetPeersResponseMessage(msg *Message) bool {
344
471
func validateSampleInfohashesResponseMessage (msg * Message ) bool {
345
472
return len (msg .R .ID ) == 20 &&
346
473
msg .R .Interval >= 0 &&
347
- // TODO: check for nodes
474
+ // TODO: check for nodes
348
475
msg .R .Num >= 0 &&
349
476
len (msg .R .Samples )% 20 == 0
350
477
}
0 commit comments