@@ -3,7 +3,10 @@ package mainline
3
3
import (
4
4
"crypto/rand"
5
5
"crypto/sha1"
6
+ "github.com/boramalper/magnetico/pkg/util"
6
7
"net"
8
+ "sort"
9
+ "strconv"
7
10
"sync"
8
11
"time"
9
12
@@ -16,6 +19,8 @@ type Protocol struct {
16
19
transport * Transport
17
20
eventHandlers ProtocolEventHandlers
18
21
started bool
22
+
23
+ stats protocolStats
19
24
}
20
25
21
26
type ProtocolEventHandlers struct {
@@ -38,6 +43,9 @@ func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol
38
43
p = new (Protocol )
39
44
p .eventHandlers = eventHandlers
40
45
p .transport = NewTransport (laddr , p .onMessage , p .eventHandlers .OnCongestion )
46
+ p .stats = protocolStats {
47
+ messageTypeCount :make (map [string ]map [string ]int ),
48
+ }
41
49
42
50
p .currentTokenSecret , p .previousTokenSecret = make ([]byte , 20 ), make ([]byte , 20 )
43
51
_ , err := rand .Read (p .currentTokenSecret )
@@ -56,6 +64,7 @@ func (p *Protocol) Start() {
56
64
p .started = true
57
65
58
66
p .transport .Start ()
67
+ go p .printStats ()
59
68
go p .updateTokenSecret ()
60
69
}
61
70
@@ -67,7 +76,110 @@ func (p *Protocol) Terminate() {
67
76
p .transport .Terminate ()
68
77
}
69
78
79
+ //statistics
80
+ type protocolStats struct {
81
+ sync.RWMutex
82
+ messageTypeCount map [string ]map [string ]int //type=>subtype=>count
83
+ }
84
+ func (ps * protocolStats ) Reset (){
85
+ ps .Lock ()
86
+ defer ps .Unlock ()
87
+ ps .messageTypeCount = make (map [string ]map [string ]int )
88
+ }
89
+ type messageTypeCountOrdered struct {
90
+ messageType string
91
+ messageCount int
92
+ percentageOverTotal float64
93
+ subMessages orderedMessagesCount
94
+ }
95
+ type orderedMessagesCount []* messageTypeCountOrdered
96
+ func (omc orderedMessagesCount ) Len () int {
97
+ return len (omc )
98
+ }
99
+ func (omc orderedMessagesCount ) Swap (i , j int ) {
100
+ omc [i ], omc [j ] = omc [j ], omc [i ]
101
+ }
102
+ func (omc orderedMessagesCount ) Less (i , j int ) bool {
103
+ return omc [i ].messageCount > omc [j ].messageCount
104
+ }
105
+ func (omc orderedMessagesCount )CalculatePercentagesOverTotal (totalMessages int ){
106
+ for _ ,mtco := range omc {
107
+ if mtco .subMessages != nil && len (mtco .subMessages ) > 0 {
108
+ mtco .subMessages .CalculatePercentagesOverTotal (totalMessages )
109
+ }
110
+ mtco .percentageOverTotal = util .RoundToDecimal (
111
+ (float64 (mtco .messageCount ) / float64 (totalMessages )) * 100 ,2 )
112
+ }
113
+ }
114
+ func (omc orderedMessagesCount ) Sort (){
115
+ for _ ,mtco := range omc {
116
+ if mtco .subMessages != nil && len (mtco .subMessages ) > 0 {
117
+ mtco .subMessages .Sort ()
118
+ }
119
+ }
120
+ sort .Sort (omc )
121
+ }
122
+ func (omc orderedMessagesCount ) String () string {
123
+ /*
124
+ string concatenation is slow, so a bytes.Buffer would be better. But, this is called once every few seconds, so this won't
125
+ be a problem and it will be much easier to write down and read
126
+ */
127
+ mostReceivedMessageTypes := ""
128
+ for mIdx , m := range omc {
129
+ if mIdx > 0 {
130
+ mostReceivedMessageTypes += ", "
131
+ }
132
+ mostReceivedMessageTypes += m .messageType
133
+ mostReceivedMessageTypes +=
134
+ " (" + strconv .Itoa (m .messageCount ) + ", " + strconv .FormatFloat (m .percentageOverTotal , 'f' , - 1 , 64 ) + "%)"
135
+
136
+ if m .subMessages != nil && len (m .subMessages ) > 0 {
137
+ //add stats for submessages unless there is only 1 submessage with len 0 (empty)
138
+ if ! (len (m .subMessages ) == 1 && len (m .subMessages [0 ].messageType ) == 0 ) {
139
+ mostReceivedMessageTypes += "[ " + m .subMessages .String () + " ]"
140
+ }
141
+ }
142
+ }
143
+ return mostReceivedMessageTypes
144
+ }
145
+ func (p * Protocol ) printStats () {
146
+ for {
147
+ time .Sleep (StatsPrintClock )
148
+ p .stats .RLock ()
149
+ orderedMessages := make (orderedMessagesCount , 0 , len (p .stats .messageTypeCount ))
150
+ totalMessages := 0
151
+ for mType , mSubTypes := range p .stats .messageTypeCount {
152
+ mCount := 0
153
+ orderedSubMessages := make (orderedMessagesCount , 0 , len (mSubTypes ))
154
+ for mSubType , mSubCount := range mSubTypes {
155
+ mCount += mSubCount
156
+ totalMessages += mSubCount
157
+ orderedSubMessages = append (orderedSubMessages , & messageTypeCountOrdered {
158
+ messageType : mSubType ,
159
+ messageCount : mSubCount ,
160
+ })
161
+ }
162
+ orderedMessages = append (orderedMessages , & messageTypeCountOrdered {
163
+ messageType : mType ,
164
+ messageCount : mCount ,
165
+ subMessages : orderedSubMessages ,
166
+ })
167
+ }
168
+ p .stats .RUnlock ()
169
+ orderedMessages .CalculatePercentagesOverTotal (totalMessages )
170
+ orderedMessages .Sort ()
171
+
172
+ zap .L ().Info ("Protocol stats (on " + StatsPrintClock .String ()+ "):" ,
173
+ zap .String ("message type" , orderedMessages .String ()),
174
+ )
175
+
176
+ p .stats .Reset ()
177
+ }
178
+ }
179
+
70
180
func (p * Protocol ) onMessage (msg * Message , addr * net.UDPAddr ) {
181
+ temporaryQ := msg .Q
182
+
71
183
switch msg .Y {
72
184
case "q" :
73
185
switch msg .Q {
@@ -140,6 +252,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
140
252
//
141
253
// sample_infohashes > get_peers > find_node > ping / announce_peer
142
254
if len (msg .R .Samples ) != 0 { // The message should be a sample_infohashes response.
255
+ temporaryQ = "sample_infohashes"
143
256
if ! validateSampleInfohashesResponseMessage (msg ) {
144
257
// zap.L().Debug("An invalid sample_infohashes response received!")
145
258
return
@@ -148,6 +261,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
148
261
p .eventHandlers .OnSampleInfohashesResponse (msg , addr )
149
262
}
150
263
} else if len (msg .R .Token ) != 0 { // The message should be a get_peers response.
264
+ temporaryQ = "get_peers"
151
265
if ! validateGetPeersResponseMessage (msg ) {
152
266
// zap.L().Debug("An invalid get_peers response received!")
153
267
return
@@ -156,6 +270,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
156
270
p .eventHandlers .OnGetPeersResponse (msg , addr )
157
271
}
158
272
} else if len (msg .R .Nodes ) != 0 { // The message should be a find_node response.
273
+ temporaryQ = "find_node"
159
274
if ! validateFindNodeResponseMessage (msg ) {
160
275
// zap.L().Debug("An invalid find_node response received!")
161
276
return
@@ -164,6 +279,7 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
164
279
p .eventHandlers .OnFindNodeResponse (msg , addr )
165
280
}
166
281
} else { // The message should be a ping or an announce_peer response.
282
+ temporaryQ = "ping_or_announce"
167
283
if ! validatePingORannouncePeerResponseMessage (msg ) {
168
284
// zap.L().Debug("An invalid ping OR announce_peer response received!")
169
285
return
@@ -182,6 +298,14 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
182
298
zap.String("type", msg.Y))
183
299
*/
184
300
}
301
+
302
+ //let's update stats at the end so that in case of an "r" message the previous switch case can update the temporaryQ field
303
+ p .stats .Lock ()
304
+ if _ , ok := p .stats .messageTypeCount [msg .Y ] ; ! ok {
305
+ p .stats .messageTypeCount [msg .Y ] = make (map [string ]int )
306
+ }
307
+ p .stats .messageTypeCount [msg .Y ][temporaryQ ] ++
308
+ p .stats .Unlock ()
185
309
}
186
310
187
311
func (p * Protocol ) SendMessage (msg * Message , addr * net.UDPAddr ) {
0 commit comments