@@ -50,13 +50,13 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri
50
50
rw .address = opts .Address
51
51
rw .stat .Name = "writer_" + strings .Replace (opts .Address , ":" , "_" , - 1 )
52
52
rw .client = client .NewRedisClient (ctx , opts .Address , opts .Username , opts .Password , opts .Tls , false )
53
- rw .ch = make (chan * entry.Entry , 1024 )
53
+ rw .ch = make (chan * entry.Entry , config . Opt . Advanced . PipelineCountLimit )
54
54
if opts .OffReply {
55
55
log .Infof ("turn off the reply of write" )
56
56
rw .offReply = true
57
57
rw .client .Send ("CLIENT" , "REPLY" , "OFF" )
58
58
} else {
59
- rw .chWaitReply = make (chan * entry.Entry , config .Opt .Advanced .PipelineCountLimit )
59
+ rw .chWaitReply = make (chan * entry.Entry , config .Opt .Advanced .PipelineCountLimit * 2 )
60
60
rw .chWaitWg .Add (1 )
61
61
go rw .processReply ()
62
62
}
@@ -75,40 +75,7 @@ func (w *redisStandaloneWriter) Close() {
75
75
func (w * redisStandaloneWriter ) StartWrite (ctx context.Context ) chan * entry.Entry {
76
76
w .chWg = sync.WaitGroup {}
77
77
w .chWg .Add (1 )
78
- timer := time .NewTicker (10 * time .Millisecond )
79
- go func () {
80
- for {
81
- select {
82
- case <- ctx .Done ():
83
- // do nothing until w.ch is closed
84
- case <- timer .C :
85
- w .client .Flush ()
86
- case e , ok := <- w .ch :
87
- if ! ok {
88
- w .client .Flush ()
89
- w .chWg .Done ()
90
- return
91
- }
92
- // switch db if we need
93
- if w .DbId != e .DbId {
94
- w .switchDbTo (e .DbId )
95
- }
96
- // send
97
- bytes := e .Serialize ()
98
- for e .SerializedSize + atomic .LoadInt64 (& w .stat .UnansweredBytes ) > config .Opt .Advanced .TargetRedisClientMaxQuerybufLen {
99
- time .Sleep (1 * time .Nanosecond )
100
- }
101
- log .Debugf ("[%s] send cmd. cmd=[%s]" , w .stat .Name , e .String ())
102
- if ! w .offReply {
103
- w .chWaitReply <- e
104
- atomic .AddInt64 (& w .stat .UnansweredBytes , e .SerializedSize )
105
- atomic .AddInt64 (& w .stat .UnansweredEntries , 1 )
106
- }
107
- w .client .SendBytesBuff (bytes )
108
- }
109
- }
110
- }()
111
-
78
+ go w .processWrite (ctx )
112
79
return w .ch
113
80
}
114
81
@@ -128,6 +95,47 @@ func (w *redisStandaloneWriter) switchDbTo(newDbId int) {
128
95
}
129
96
}
130
97
98
+ func (w * redisStandaloneWriter ) processWrite (ctx context.Context ) {
99
+ ticker := time .NewTicker (10 * time .Millisecond )
100
+ defer ticker .Stop ()
101
+ for {
102
+ select {
103
+ case <- ctx .Done ():
104
+ // do nothing until w.ch is closed
105
+ case <- ticker .C :
106
+ w .client .Flush ()
107
+ case e , ok := <- w .ch :
108
+ if ! ok {
109
+ // clean up and exit
110
+ w .client .Flush ()
111
+ w .chWg .Done ()
112
+ return
113
+ }
114
+ // switch db if we need
115
+ if w .DbId != e .DbId {
116
+ w .switchDbTo (e .DbId )
117
+ }
118
+ // send
119
+ bytes := e .Serialize ()
120
+ for e .SerializedSize + atomic .LoadInt64 (& w .stat .UnansweredBytes ) > config .Opt .Advanced .TargetRedisClientMaxQuerybufLen {
121
+ time .Sleep (1 * time .Nanosecond )
122
+ }
123
+ log .Debugf ("[%s] send cmd. cmd=[%s]" , w .stat .Name , e .String ())
124
+ if ! w .offReply {
125
+ select {
126
+ case w .chWaitReply <- e :
127
+ default :
128
+ w .client .Flush ()
129
+ w .chWaitReply <- e
130
+ }
131
+ atomic .AddInt64 (& w .stat .UnansweredBytes , e .SerializedSize )
132
+ atomic .AddInt64 (& w .stat .UnansweredEntries , 1 )
133
+ }
134
+ w .client .SendBytesBuff (bytes )
135
+ }
136
+ }
137
+ }
138
+
131
139
func (w * redisStandaloneWriter ) processReply () {
132
140
for e := range w .chWaitReply {
133
141
reply , err := w .client .Receive ()
0 commit comments