Skip to content

Commit fb3ab47

Browse files
Fixed bug discussed in #52 (#53)
Signed-off-by: Jeff Chauvin <chauvinj@us.ibm.com> Signed-off-by: Jeff Chauvin <chauvinj@us.ibm.com>
1 parent 321265c commit fb3ab47

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

fluent/client/ws_client.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ SOFTWARE.
2525
package client
2626

2727
import (
28+
"bytes"
2829
"crypto/tls"
2930
"errors"
3031
"net/http"
@@ -265,10 +266,14 @@ func (c *WSClient) Reconnect() (err error) {
265266

266267
// Send sends a single msgp.Encodable across the wire.
267268
func (c *WSClient) Send(e protocol.ChunkEncoder) error {
269+
var (
270+
err error
271+
rawMessageData bytes.Buffer
272+
)
268273
// Check for an async connection error and return it here.
269274
// In most cases, the client will not care about reading from
270275
// the connection, so checking for the error here is sufficient.
271-
if err := c.getErr(); err != nil {
276+
if err = c.getErr(); err != nil {
272277
return err // TODO: wrap this
273278
}
274279

@@ -278,8 +283,17 @@ func (c *WSClient) Send(e protocol.ChunkEncoder) error {
278283
return errors.New("no active session")
279284
}
280285

281-
// msgp.Encode makes use of object pool to decrease allocations
282-
return msgp.Encode(session.Connection, e)
286+
err = msgp.Encode(&rawMessageData, e)
287+
if err != nil {
288+
return err
289+
}
290+
291+
bytesData := rawMessageData.Bytes()
292+
// Write function does not accurately return the number of bytes written
293+
// so it would be ineffective to compare
294+
_, err = c.session.Connection.Write(bytesData)
295+
296+
return err
283297
}
284298

285299
// SendRaw sends an array of bytes across the wire.

fluent/client/ws_client_test.go

+40
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"bytes"
2929
"crypto/tls"
3030
"errors"
31+
"math/rand"
3132
"net/http"
3233
"net/http/httptest"
3334
"strings"
@@ -46,6 +47,7 @@ import (
4647
"github.com/gorilla/websocket"
4748
. "github.com/onsi/ginkgo/v2"
4849
. "github.com/onsi/gomega"
50+
"github.com/tinylib/msgp/msgp"
4951
)
5052

5153
var _ = Describe("IAMAuthInfo", func() {
@@ -303,6 +305,44 @@ var _ = Describe("WSClient", func() {
303305
Expect(bytes.Equal(bits, writtenbits)).To(BeTrue())
304306
})
305307

308+
It("Sends the message", func() {
309+
msgBytes, _ := msg.MarshalMsg(nil)
310+
Expect(client.Send(&msg)).ToNot(HaveOccurred())
311+
312+
writtenBytes := conn.WriteArgsForCall(0)
313+
Expect(bytes.Equal(msgBytes, writtenBytes)).To(BeTrue())
314+
})
315+
316+
When("The message is large", func() {
317+
const charset = "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
318+
319+
var (
320+
expectedBytes int
321+
messageSize = 65536
322+
)
323+
324+
JustBeforeEach(func() {
325+
seededRand := rand.New(
326+
rand.NewSource(time.Now().UnixNano()))
327+
m := make([]byte, messageSize)
328+
for i := range m {
329+
m[i] = charset[seededRand.Intn(len(charset))]
330+
}
331+
msg.Record = m
332+
333+
var b bytes.Buffer
334+
Expect(msgp.Encode(&b, &msg)).ToNot(HaveOccurred())
335+
expectedBytes = len(b.Bytes())
336+
})
337+
338+
It("Sends the correct number of bits", func() {
339+
Expect(client.Send(&msg)).ToNot(HaveOccurred())
340+
Expect(conn.WriteCallCount()).To(Equal(1))
341+
writtenBytes := len(conn.WriteArgsForCall(0))
342+
Expect(writtenBytes).To(Equal(expectedBytes))
343+
})
344+
})
345+
306346
When("the connection is disconnected", func() {
307347
JustBeforeEach(func() {
308348
err := client.Disconnect()

0 commit comments

Comments
 (0)