Skip to content

Commit ed7d498

Browse files
authored
fix: improve zero queue consumer support for partitioned topics (#1424)
1 parent d19a9f8 commit ed7d498

File tree

3 files changed

+80
-26
lines changed

3 files changed

+80
-26
lines changed

pulsar/consumer_impl.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@ import (
2323
"fmt"
2424
"math/rand"
2525
"strconv"
26-
"strings"
2726
"sync"
2827
"time"
2928

30-
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
31-
3229
"github.com/apache/pulsar-client-go/pulsar/crypto"
3330
"github.com/apache/pulsar-client-go/pulsar/internal"
3431
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -269,11 +266,6 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
269266
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
270267
}
271268

272-
if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
273-
strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
274-
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
275-
}
276-
277269
if len(partitions) == 1 && options.EnableZeroQueueConsumer {
278270
return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation)
279271
}

pulsar/consumer_zero_queue.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,11 @@ func newZeroConsumer(client *client, options ConsumerOptions, topic string,
6666
consumerName: options.Name,
6767
metrics: client.metrics.GetLeveledMetrics(topic),
6868
}
69-
opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 0, zc.options)
69+
tn, err := internal.ParseTopicName(topic)
70+
if err != nil {
71+
return nil, err
72+
}
73+
opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, tn.Partition, zc.options)
7074
conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics)
7175
if err != nil {
7276
return nil, err
@@ -142,11 +146,14 @@ func (z *zeroQueueConsumer) Ack(m Message) error {
142146

143147
func (z *zeroQueueConsumer) checkMsgIDPartition(msgID MessageID) error {
144148
partition := msgID.PartitionIdx()
145-
if partition != 0 {
146-
z.log.Errorf("invalid partition index %d expected a partition equal to 0",
147-
partition)
148-
return fmt.Errorf("invalid partition index %d expected a partition equal to 0",
149-
partition)
149+
if partition == 0 || partition == -1 {
150+
return nil
151+
}
152+
if partition != z.pc.partitionIdx {
153+
z.log.Errorf("invalid partition index %d expected a partition equal to %d",
154+
partition, z.pc.partitionIdx)
155+
return fmt.Errorf("invalid partition index %d expected a partition equal to %d",
156+
partition, z.pc.partitionIdx)
150157
}
151158
return nil
152159
}

pulsar/consumer_zero_queue_test.go

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ func TestNormalZeroQueueConsumer(t *testing.T) {
115115
assert.Equal(t, "pulsar", msg.Key())
116116
assert.Equal(t, expectProperties, msg.Properties())
117117
// ack message
118-
consumer.Ack(msg)
118+
err = consumer.Ack(msg)
119+
assert.Nil(t, err)
119120
log.Printf("receive message: %s", msg.ID().String())
120121
}
121122
err = consumer.Unsubscribe()
@@ -228,7 +229,8 @@ func TestReconnectConsumer(t *testing.T) {
228229
assert.Equal(t, "pulsar", msg.Key())
229230
assert.Equal(t, expectProperties, msg.Properties())
230231
// ack message
231-
consumer.Ack(msg)
232+
err = consumer.Ack(msg)
233+
assert.Nil(t, err)
232234
log.Printf("receive message: %s", msg.ID().String())
233235
}
234236
err = consumer.Unsubscribe()
@@ -341,7 +343,7 @@ func TestPartitionZeroQueueConsumer(t *testing.T) {
341343
assert.Nil(t, consumer)
342344
assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics")
343345
}
344-
func TestOnePartitionZeroQueueConsumer(t *testing.T) {
346+
func TestSpecifiedPartitionZeroQueueConsumer(t *testing.T) {
345347
client, err := NewClient(ClientOptions{
346348
URL: lookupURL,
347349
})
@@ -350,17 +352,65 @@ func TestOnePartitionZeroQueueConsumer(t *testing.T) {
350352
defer client.Close()
351353

352354
topic := newTopicName()
353-
err = createPartitionedTopic(topic, 1)
355+
ctx := context.Background()
356+
err = createPartitionedTopic(topic, 2)
357+
assert.Nil(t, err)
358+
topics, err := client.TopicPartitions(topic)
354359
assert.Nil(t, err)
355360

356361
// create consumer
357362
consumer, err := client.Subscribe(ConsumerOptions{
358-
Topic: topic,
363+
Topic: topics[1],
359364
SubscriptionName: "my-sub",
360365
EnableZeroQueueConsumer: true,
361366
})
362-
assert.Nil(t, consumer)
363-
assert.Error(t, err, "ZeroQueueConsumer is not supported for partitioned topics")
367+
assert.Nil(t, err)
368+
_, ok := consumer.(*zeroQueueConsumer)
369+
assert.True(t, ok)
370+
defer consumer.Close()
371+
372+
// create producer
373+
producer, err := client.CreateProducer(ProducerOptions{
374+
Topic: topics[1],
375+
DisableBatching: false,
376+
})
377+
assert.Nil(t, err)
378+
defer producer.Close()
379+
380+
// send 10 messages
381+
for i := 0; i < 10; i++ {
382+
msg, err := producer.Send(ctx, &ProducerMessage{
383+
Payload: []byte(fmt.Sprintf("hello-%d", i)),
384+
Key: "pulsar",
385+
Properties: map[string]string{
386+
"key-1": "pulsar-1",
387+
},
388+
})
389+
assert.Nil(t, err)
390+
log.Printf("send message: %s", msg.String())
391+
}
392+
393+
// receive 10 messages
394+
for i := 0; i < 10; i++ {
395+
msg, err := consumer.Receive(context.Background())
396+
if err != nil {
397+
log.Fatal(err)
398+
}
399+
400+
expectMsg := fmt.Sprintf("hello-%d", i)
401+
expectProperties := map[string]string{
402+
"key-1": "pulsar-1",
403+
}
404+
assert.Equal(t, []byte(expectMsg), msg.Payload())
405+
assert.Equal(t, "pulsar", msg.Key())
406+
assert.Equal(t, expectProperties, msg.Properties())
407+
// ack message
408+
err = consumer.Ack(msg)
409+
assert.Nil(t, err)
410+
log.Printf("receive message: %s", msg.ID().String())
411+
}
412+
err = consumer.Unsubscribe()
413+
assert.Nil(t, err)
364414
}
365415

366416
func TestZeroQueueConsumerGetLastMessageIDs(t *testing.T) {
@@ -576,7 +626,8 @@ func TestZeroQueueConsumer_Nack(t *testing.T) {
576626

577627
if i%2 == 0 {
578628
// Only acks even messages
579-
consumer.Ack(msg)
629+
err = consumer.Ack(msg)
630+
assert.Nil(t, err)
580631
} else {
581632
// Fails to process odd messages
582633
consumer.Nack(msg)
@@ -591,7 +642,8 @@ func TestZeroQueueConsumer_Nack(t *testing.T) {
591642
assert.Nil(t, err)
592643
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
593644

594-
consumer.Ack(msg)
645+
err = consumer.Ack(msg)
646+
assert.Nil(t, err)
595647
}
596648
}
597649

@@ -641,7 +693,8 @@ func TestZeroQueueConsumer_Seek(t *testing.T) {
641693
msg, err := consumer.Receive(ctx)
642694
assert.Nil(t, err)
643695
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
644-
consumer.Ack(msg)
696+
err = consumer.Ack(msg)
697+
assert.Nil(t, err)
645698
}
646699

647700
err = consumer.Seek(seekID)
@@ -698,7 +751,8 @@ func TestZeroQueueConsumer_SeekByTime(t *testing.T) {
698751
msg, err := consumer.Receive(ctx)
699752
assert.Nil(t, err)
700753
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
701-
consumer.Ack(msg)
754+
err = consumer.Ack(msg)
755+
assert.Nil(t, err)
702756
}
703757

704758
currentTimestamp := time.Now()
@@ -711,6 +765,7 @@ func TestZeroQueueConsumer_SeekByTime(t *testing.T) {
711765
msg, err := consumer.Receive(ctx)
712766
assert.Nil(t, err)
713767
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
714-
consumer.Ack(msg)
768+
err = consumer.Ack(msg)
769+
assert.Nil(t, err)
715770
}
716771
}

0 commit comments

Comments
 (0)