Skip to content

Reader gets an empty assignment and never recovers with topic is auto-created #800

@arjantop-cai

Description

@arjantop-cai

Describe the bug
When a topic is auto-created the Reader gets stuck with zero assigned partitions and never starts consuming/re-balances. If the topic already exists in kafka there are no problems and the reader is assigned partitions as expected and starts consuming.
The library prints received empty assignments for group and nothing happens afterwards.

Kafka Version
3.0.0 + KRaft

To Reproduce
Run kafka with Kraft:

docker run -p9092:9092  arjancai/kafka-kraft:3.0.0

Run this sample program:

package main

import (
	"context"
	"log"

	"github.com/segmentio/kafka-go"
)

func main() {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers: []string{"localhost:9092"},
		Topic: "debug-1",
		GroupID: "test",
		Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
			log.Printf(msg, args...)
		}),
	})
	defer r.Close()

	msg, err := r.ReadMessage(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	log.Println(string(msg.Value))
}

First run:

2021/11/27 17:46:21 entering loop for consumer group, test
2021/11/27 17:46:24 joined group test as member ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-b45342c5-b377-499a-aeda-4dbd4258b606 in generation 6
2021/11/27 17:46:24 selected as leader for group, test
2021/11/27 17:46:24 using 'range' balancer to assign group, test
2021/11/27 17:46:24 found member: ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-b45342c5-b377-499a-aeda-4dbd4258b606/[]byte(nil)
2021/11/27 17:46:24 joinGroup succeeded for response, test.  generationID=6, memberID=___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-b45342c5-b377-499a-aeda-4dbd4258b606
2021/11/27 17:46:24 Joined group test as member ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-b45342c5-b377-499a-aeda-4dbd4258b606 in generation 6
2021/11/27 17:46:24 Syncing 1 assignments for generation 6 as member ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-b45342c5-b377-499a-aeda-4dbd4258b606
2021/11/27 17:46:24 received empty assignments for group, test as member ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-b45342c5-b377-499a-aeda-4dbd4258b606 for generation 6
2021/11/27 17:46:24 sync group finished for group, test
2021/11/27 17:46:24 subscribed to topics and partitions: map[]
2021/11/27 17:46:24 started heartbeat for group, test [3s]
2021/11/27 17:46:24 started commit for group test
[2021-11-27 16:46:21,760] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group test in Empty state. Created a new member id ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-b45342c5-b377-499a-aeda-4dbd4258b606 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2021-11-27 16:46:21,760] INFO [GroupCoordinator 1]: Preparing to rebalance group test in state PreparingRebalance with old generation 5 (__consumer_offsets-48) (reason: Adding new member ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-b45342c5-b377-499a-aeda-4dbd4258b606 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2021-11-27 16:46:24,769] INFO [GroupCoordinator 1]: Stabilized group test generation 6 (__consumer_offsets-48) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2021-11-27 16:46:24,788] INFO Sent auto-creation request for Set(debug-5) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
[2021-11-27 16:46:24,790] INFO [Controller 1] createTopics result(s): CreatableTopic(name='debug-5', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
[2021-11-27 16:46:24,791] INFO [Controller 1] Created topic debug-5 with topic ID GpGLvA1BSPePuJe1cqZ74Q. (org.apache.kafka.controller.ReplicationControlManager)
[2021-11-27 16:46:24,791] INFO [Controller 1] Created partition debug-5-0 with topic ID GpGLvA1BSPePuJe1cqZ74Q and PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], addingReplicas=[], leader=1, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
[2021-11-27 16:46:24,793] INFO [GroupCoordinator 1]: Assignment received from leader ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-b45342c5-b377-499a-aeda-4dbd4258b606 for group test for generation 6. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2021-11-27 16:46:24,822] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(debug-5-0) (kafka.server.ReplicaFetcherManager)
[2021-11-27 16:46:24,825] INFO [LogLoader partition=debug-5-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log$)
[2021-11-27 16:46:24,826] INFO Created log for partition debug-5-0 in /tmp/kraft-combined-logs/debug-5-0 with properties {} (kafka.log.LogManager)
[2021-11-27 16:46:24,827] INFO [Partition debug-5-0 broker=1] No checkpointed highwatermark is found for partition debug-5-0 (kafka.cluster.Partition)
[2021-11-27 16:46:24,827] INFO [Partition debug-5-0 broker=1] Log loaded for partition debug-5-0 with initial high watermark 0 (kafka.cluster.Partition)

Second run:

2021/11/27 17:47:38 entering loop for consumer group, test
2021/11/27 17:48:06 joined group test as member ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-e24f7b2a-51fb-43d0-a22e-466b220369dd in generation 7
2021/11/27 17:48:06 selected as leader for group, test
2021/11/27 17:48:06 using 'range' balancer to assign group, test
2021/11/27 17:48:06 found member: ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-e24f7b2a-51fb-43d0-a22e-466b220369dd/[]byte(nil)
2021/11/27 17:48:06 found topic/partition: debug-5/0
2021/11/27 17:48:06 assigned member/topic/partitions ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-e24f7b2a-51fb-43d0-a22e-466b220369dd/debug-5/[0]
2021/11/27 17:48:06 joinGroup succeeded for response, test.  generationID=7, memberID=___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-e24f7b2a-51fb-43d0-a22e-466b220369dd
2021/11/27 17:48:06 Joined group test as member ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-e24f7b2a-51fb-43d0-a22e-466b220369dd in generation 7
2021/11/27 17:48:06 Syncing 1 assignments for generation 7 as member ___go_build_go_test_kafka@ARJAN-TOPOLOVEC.local (github.com/segmentio/kafka-go)-e24f7b2a-51fb-43d0-a22e-466b220369dd
2021/11/27 17:48:06 sync group finished for group, test
2021/11/27 17:48:06 subscribed to topics and partitions: map[{topic:debug-5 partition:0}:-2]
2021/11/27 17:48:06 initializing kafka reader for partition 0 of debug-5 starting at offset -2
2021/11/27 17:48:06 started heartbeat for group, test [3s]
2021/11/27 17:48:06 started commit for group test
2021/11/27 17:48:06 the kafka reader for partition 0 of debug-5 is seeking to offset 0
2021/11/27 17:48:15 no messages received from kafka within the allocated time for partition 0 of debug-5 at offset 0

Expected behavior
Reader is assigned a partition and starts consuming messages.

Additional context

  • Might not be just KRaft, did not test with Zookeeper
  • Does no occur with kafka-console-consumer
kafka-console-consumer --bootstrap-server=localhost:9092 --group=kcc-test --topic=kcc-test

Auto-creation log line appears first in Kafka logs (compared to above where it happens after group join)

[2021-11-27 16:44:00,410] INFO Sent auto-creation request for Set(kcc-test) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
[2021-11-27 16:44:00,414] INFO [Controller 1] createTopics result(s): CreatableTopic(name='kcc-test', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
[2021-11-27 16:44:00,414] INFO [Controller 1] Created topic kcc-test with topic ID KMv2gGTQR-OkdLnsNQxXDQ. (org.apache.kafka.controller.ReplicationControlManager)
[2021-11-27 16:44:00,414] INFO [Controller 1] Created partition kcc-test-0 with topic ID KMv2gGTQR-OkdLnsNQxXDQ and PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], addingReplicas=[], leader=1, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
[2021-11-27 16:44:00,426] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group kcc-test in Empty state. Created a new member id consumer-kcc-test-1-33768d2f-1223-42a5-a769-cf1911580e97 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2021-11-27 16:44:00,429] INFO [GroupCoordinator 1]: Preparing to rebalance group kcc-test in state PreparingRebalance with old generation 0 (__consumer_offsets-34) (reason: Adding new member consumer-kcc-test-1-33768d2f-1223-42a5-a769-cf1911580e97 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2021-11-27 16:44:00,441] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(kcc-test-0) (kafka.server.ReplicaFetcherManager)
[2021-11-27 16:44:00,443] INFO [LogLoader partition=kcc-test-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log$)
[2021-11-27 16:44:00,443] INFO Created log for partition kcc-test-0 in /tmp/kraft-combined-logs/kcc-test-0 with properties {} (kafka.log.LogManager)
[2021-11-27 16:44:00,444] INFO [Partition kcc-test-0 broker=1] No checkpointed highwatermark is found for partition kcc-test-0 (kafka.cluster.Partition)
[2021-11-27 16:44:00,444] INFO [Partition kcc-test-0 broker=1] Log loaded for partition kcc-test-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-11-27 16:44:03,432] INFO [GroupCoordinator 1]: Stabilized group kcc-test generation 1 (__consumer_offsets-34) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2021-11-27 16:44:03,460] INFO [GroupCoordinator 1]: Assignment received from leader consumer-kcc-test-1-33768d2f-1223-42a5-a769-cf1911580e97 for group kcc-test for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions