Skip to content

Commit 513ca0a

Browse files
committed
Add limit for the batch entry size
1 parent 7c70f2e commit 513ca0a

File tree

8 files changed

+9
-9
lines changed

8 files changed

+9
-9
lines changed

pkg/amqp/buffer.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
//OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2222
//SOFTWARE
2323

24-
2524
package amqp
2625

2726
import (

pkg/amqp/encode.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
//OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2222
//SOFTWARE
2323

24-
25-
2624
package amqp
2725

2826
import (

pkg/amqp/error_stdlib.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
//OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2222
//SOFTWARE
2323

24-
2524
//go:build !pkgerrors
2625
// +build !pkgerrors
2726

pkg/amqp/types.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
//OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2222
//SOFTWARE
2323

24-
2524
package amqp
2625

2726
import (

pkg/stream/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -464,9 +464,9 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
464464
minBatchPublishingDelay, maxBatchPublishingDelay)
465465
}
466466

467-
if options.SubEntrySize < minSubEntrySize {
468-
return nil, fmt.Errorf("SubEntrySize value must equal or bigger than %d",
469-
minSubEntrySize)
467+
if options.SubEntrySize < minSubEntrySize || options.SubEntrySize > maxSubEntrySize {
468+
return nil, fmt.Errorf("SubEntrySize values must be between %d and %d",
469+
minSubEntrySize, maxSubEntrySize)
470470
}
471471

472472
if !options.isSubEntriesBatching() {

pkg/stream/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ const (
9090
maxBatchSize = 10_000
9191

9292
minSubEntrySize = 1
93+
maxSubEntrySize = 65535
9394

9495
minBatchPublishingDelay = 50
9596
maxBatchPublishingDelay = 500

pkg/stream/producer.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ func (cs *ConfirmationStatus) GetMessage() message.StreamMessage {
4545
return cs.message
4646
}
4747

48-
4948
func (cs *ConfirmationStatus) GetErrorCode() uint16 {
5049
return cs.errorCode
5150
}

pkg/stream/producer_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,11 @@ var _ = Describe("Streaming Producers", func() {
485485
SetSubEntrySize(1).SetCompression(Compression{}.Gzip()))
486486
Expect(err).To(HaveOccurred())
487487

488+
_, err = env.NewProducer(testProducerStream, &ProducerOptions{
489+
SubEntrySize: 65_539,
490+
})
491+
Expect(err).To(HaveOccurred())
492+
488493
err = env.Close()
489494
Expect(err).NotTo(HaveOccurred())
490495
})

0 commit comments

Comments
 (0)