@@ -3,7 +3,10 @@ package mainline
3
3
import (
4
4
"crypto/rand"
5
5
"crypto/sha1"
6
+ "github.com/boramalper/magnetico/pkg/util"
6
7
"net"
8
+ "sort"
9
+ "strconv"
7
10
"sync"
8
11
"time"
9
12
@@ -16,6 +19,8 @@ type Protocol struct {
16
19
transport * Transport
17
20
eventHandlers ProtocolEventHandlers
18
21
started bool
22
+
23
+ stats protocolStats
19
24
}
20
25
21
26
type ProtocolEventHandlers struct {
@@ -38,6 +43,9 @@ func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol
38
43
p = new (Protocol )
39
44
p .eventHandlers = eventHandlers
40
45
p .transport = NewTransport (laddr , p .onMessage , p .eventHandlers .OnCongestion )
46
+ p .stats = protocolStats {
47
+ messageTypeCount : make (map [string ]map [string ]int ),
48
+ }
41
49
42
50
p .currentTokenSecret , p .previousTokenSecret = make ([]byte , 20 ), make ([]byte , 20 )
43
51
_ , err := rand .Read (p .currentTokenSecret )
@@ -56,6 +64,7 @@ func (p *Protocol) Start() {
56
64
p .started = true
57
65
58
66
p .transport .Start ()
67
+ go p .printStats ()
59
68
go p .updateTokenSecret ()
60
69
}
61
70
@@ -67,7 +76,114 @@ func (p *Protocol) Terminate() {
67
76
p .transport .Terminate ()
68
77
}
69
78
79
+ //statistics
80
+ type protocolStats struct {
81
+ sync.RWMutex
82
+ messageTypeCount map [string ]map [string ]int //type=>subtype=>count
83
+ }
84
+
85
+ func (ps * protocolStats ) Reset () {
86
+ ps .Lock ()
87
+ defer ps .Unlock ()
88
+ ps .messageTypeCount = make (map [string ]map [string ]int )
89
+ }
90
+
91
+ type messageTypeCountOrdered struct {
92
+ messageType string
93
+ messageCount int
94
+ percentageOverTotal float64
95
+ subMessages orderedMessagesCount
96
+ }
97
+ type orderedMessagesCount []* messageTypeCountOrdered
98
+
99
+
100
+ func (omc orderedMessagesCount ) Len () int {
101
+ return len (omc )
102
+ }
103
+ func (omc orderedMessagesCount ) Swap (i , j int ) {
104
+ omc [i ], omc [j ] = omc [j ], omc [i ]
105
+ }
106
+ func (omc orderedMessagesCount ) Less (i , j int ) bool {
107
+ return omc [i ].messageCount > omc [j ].messageCount
108
+ }
109
+ func (omc orderedMessagesCount ) CalculatePercentagesOverTotal (totalMessages int ) {
110
+ for _ , mtco := range omc {
111
+ if mtco .subMessages != nil && len (mtco .subMessages ) > 0 {
112
+ mtco .subMessages .CalculatePercentagesOverTotal (totalMessages )
113
+ }
114
+ mtco .percentageOverTotal = util .RoundToDecimal (
115
+ (float64 (mtco .messageCount )/ float64 (totalMessages ))* 100 , 2 )
116
+ }
117
+ }
118
+ func (omc orderedMessagesCount ) Sort () {
119
+ for _ , mtco := range omc {
120
+ if mtco .subMessages != nil && len (mtco .subMessages ) > 0 {
121
+ mtco .subMessages .Sort ()
122
+ }
123
+ }
124
+ sort .Sort (omc )
125
+ }
126
+ func (omc orderedMessagesCount ) String () string {
127
+ /*
128
+ string concatenation is slow, so a bytes.Buffer would be better. But, this is called once every few seconds, so this won't
129
+ be a problem and it will be much easier to write down and read
130
+ */
131
+ mostReceivedMessageTypes := ""
132
+ for mIdx , m := range omc {
133
+ if mIdx > 0 {
134
+ mostReceivedMessageTypes += ", "
135
+ }
136
+ mostReceivedMessageTypes += m .messageType
137
+ mostReceivedMessageTypes +=
138
+ " (" + strconv .Itoa (m .messageCount ) + ", " + strconv .FormatFloat (m .percentageOverTotal , 'f' , - 1 , 64 ) + "%)"
139
+
140
+ if m .subMessages != nil && len (m .subMessages ) > 0 {
141
+ //add stats for submessages unless there is only 1 submessage with len 0 (empty)
142
+ if ! (len (m .subMessages ) == 1 && len (m .subMessages [0 ].messageType ) == 0 ) {
143
+ mostReceivedMessageTypes += "[ " + m .subMessages .String () + " ]"
144
+ }
145
+ }
146
+ }
147
+ return mostReceivedMessageTypes
148
+ }
149
+ func (p * Protocol ) printStats () {
150
+ for {
151
+ time .Sleep (StatsPrintClock )
152
+ p .stats .RLock ()
153
+ orderedMessages := make (orderedMessagesCount , 0 , len (p .stats .messageTypeCount ))
154
+ totalMessages := 0
155
+ for mType , mSubTypes := range p .stats .messageTypeCount {
156
+ mCount := 0
157
+ orderedSubMessages := make (orderedMessagesCount , 0 , len (mSubTypes ))
158
+ for mSubType , mSubCount := range mSubTypes {
159
+ mCount += mSubCount
160
+ totalMessages += mSubCount
161
+ orderedSubMessages = append (orderedSubMessages , & messageTypeCountOrdered {
162
+ messageType : mSubType ,
163
+ messageCount : mSubCount ,
164
+ })
165
+ }
166
+ orderedMessages = append (orderedMessages , & messageTypeCountOrdered {
167
+ messageType : mType ,
168
+ messageCount : mCount ,
169
+ subMessages : orderedSubMessages ,
170
+ })
171
+ }
172
+ p .stats .RUnlock ()
173
+ orderedMessages .CalculatePercentagesOverTotal (totalMessages )
174
+ orderedMessages .Sort ()
175
+
176
+ zap .L ().Info ("Protocol stats (on " + StatsPrintClock .String ()+ "):" ,
177
+ zap .String ("message type" , orderedMessages .String ()),
178
+ )
179
+
180
+ p .stats .Reset ()
181
+ }
182
+ }
183
+
70
184
func (p * Protocol ) onMessage (msg * Message , addr * net.UDPAddr ) {
185
+ temporaryQ := msg .Q
186
+
71
187
switch msg .Y {
72
188
case "q" :
73
189
switch msg .Q {
@@ -140,6 +256,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
140
256
//
141
257
// sample_infohashes > get_peers > find_node > ping / announce_peer
142
258
if len (msg .R .Samples ) != 0 { // The message should be a sample_infohashes response.
259
+ temporaryQ = "sample_infohashes"
143
260
if ! validateSampleInfohashesResponseMessage (msg ) {
144
261
// zap.L().Debug("An invalid sample_infohashes response received!")
145
262
return
@@ -148,6 +265,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
148
265
p .eventHandlers .OnSampleInfohashesResponse (msg , addr )
149
266
}
150
267
} else if len (msg .R .Token ) != 0 { // The message should be a get_peers response.
268
+ temporaryQ = "get_peers"
151
269
if ! validateGetPeersResponseMessage (msg ) {
152
270
// zap.L().Debug("An invalid get_peers response received!")
153
271
return
@@ -156,6 +274,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
156
274
p .eventHandlers .OnGetPeersResponse (msg , addr )
157
275
}
158
276
} else if len (msg .R .Nodes ) != 0 { // The message should be a find_node response.
277
+ temporaryQ = "find_node"
159
278
if ! validateFindNodeResponseMessage (msg ) {
160
279
// zap.L().Debug("An invalid find_node response received!")
161
280
return
@@ -164,6 +283,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
164
283
p .eventHandlers .OnFindNodeResponse (msg , addr )
165
284
}
166
285
} else { // The message should be a ping or an announce_peer response.
286
+ temporaryQ = "ping_or_announce"
167
287
if ! validatePingORannouncePeerResponseMessage (msg ) {
168
288
// zap.L().Debug("An invalid ping OR announce_peer response received!")
169
289
return
@@ -182,6 +302,14 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
182
302
zap.String("type", msg.Y))
183
303
*/
184
304
}
305
+
306
+ //let's update stats at the end so that in case of an "r" message the previous switch case can update the temporaryQ field
307
+ p .stats .Lock ()
308
+ if _ , ok := p .stats .messageTypeCount [msg .Y ]; ! ok {
309
+ p .stats .messageTypeCount [msg .Y ] = make (map [string ]int )
310
+ }
311
+ p .stats .messageTypeCount [msg .Y ][temporaryQ ]++
312
+ p .stats .Unlock ()
185
313
}
186
314
187
315
func (p * Protocol ) SendMessage (msg * Message , addr * net.UDPAddr ) {
0 commit comments