Skip to content

Commit 668a4e7

Browse files
committed
Update the heartbeat
valude on read socket
1 parent 3898dfa commit 668a4e7

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

pkg/stream/client.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,18 @@ func (c *Client) getTuneState() TuneState {
7979
c.mutex.Lock()
8080
defer c.mutex.Unlock()
8181
return c.tuneState
82+
}
8283

84+
func (c *Client) getLastHeartBeat() time.Time {
85+
c.mutex.Lock()
86+
defer c.mutex.Unlock()
87+
return c.lastHeartBeat
88+
}
89+
90+
func (c *Client) setLastHeartBeat(value time.Time) {
91+
c.mutex.Lock()
92+
defer c.mutex.Unlock()
93+
c.lastHeartBeat = value
8394
}
8495

8596
func (c *Client) connect() error {
@@ -313,23 +324,19 @@ func (c *Client) DeleteStream(streamName string) error {
313324
}
314325

315326
func (c *Client) heartBeat() {
316-
317327
ticker := time.NewTicker(60 * time.Second)
318-
319328
tickerHeatBeat := time.NewTicker(20 * time.Second)
320-
//defer tickerHeatBeat.Stop()
321329
resp := c.coordinator.NewResponseWitName("heartbeat")
322330
var heartBeatMissed int32
323331
go func() {
324332
for c.socket.isOpen() {
325333
<-tickerHeatBeat.C
326-
logs.LogDebug("Heart beat since: %s", time.Since(c.lastHeartBeat))
327-
if time.Since(c.lastHeartBeat) > 120*time.Second {
334+
if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second {
328335
v := atomic.AddInt32(&heartBeatMissed, 1)
329336
logs.LogWarn("Missing heart beat: %d", v)
330-
if v > 3 {
337+
if v >= 2 {
331338
logs.LogWarn("Too many heartbeat missing: %d", v)
332-
//c.Close()
339+
c.Close()
333340
}
334341
} else {
335342
atomic.StoreInt32(&heartBeatMissed, 0)

pkg/stream/server_frame.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func (c *Client) handleResponse() {
3232
_ = c.Close()
3333
break
3434
}
35+
c.lastHeartBeat = time.Now()
3536
readerProtocol.FrameLen = frameLen
3637
readerProtocol.CommandID = uShortExtractResponseCode(readUShort(buffer))
3738
readerProtocol.Version = readUShort(buffer)
@@ -92,7 +93,6 @@ func (c *Client) handleResponse() {
9293
{
9394

9495
c.handleHeartbeat()
95-
//logDebug("RECEIVED Heartbeat %d buff:%d \n", readerProtocol.CommandID, buffer.Buffered())
9696

9797
}
9898
case CommandQueryOffset:
@@ -479,5 +479,5 @@ func (c *Client) closeFrameHandler(readProtocol *ReaderProtocol, r *bufio.Reader
479479

480480
func (c *Client) handleHeartbeat() {
481481
logs.LogDebug("Heart beat received at %s", time.Now())
482-
c.lastHeartBeat = time.Now()
482+
c.setLastHeartBeat(time.Now())
483483
}

0 commit comments

Comments
 (0)