10
10
import java .util .List ;
11
11
import java .util .Properties ;
12
12
import java .util .UUID ;
13
- import java .util .stream .Collectors ;
14
13
import java .util .stream .Stream ;
15
14
import lombok .extern .slf4j .Slf4j ;
16
15
import lombok .val ;
@@ -32,7 +31,6 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
32
31
@ Test
33
32
void shouldNotFoundWhenNoSuchConsumerGroupId () {
34
33
String groupId = "groupA" ;
35
- String expError = "The group id does not exist" ;
36
34
webTestClient
37
35
.delete ()
38
36
.uri ("/api/clusters/{clusterName}/consumer-groups/{groupId}" , LOCAL , groupId )
@@ -47,12 +45,13 @@ void shouldOkWhenConsumerGroupIsNotActive() {
47
45
48
46
//Create a consumer and subscribe to the topic
49
47
String groupId = UUID .randomUUID ().toString ();
50
- val consumer = createTestConsumerWithGroupId (groupId );
51
- consumer .subscribe (List .of (topicName ));
52
- consumer .poll (Duration .ofMillis (100 ));
48
+ try ( val consumer = createTestConsumerWithGroupId (groupId )) {
49
+ consumer .subscribe (List .of (topicName ));
50
+ consumer .poll (Duration .ofMillis (100 ));
53
51
54
- //Unsubscribe from all topics to be able to delete this consumer
55
- consumer .unsubscribe ();
52
+ //Unsubscribe from all topics to be able to delete this consumer
53
+ consumer .unsubscribe ();
54
+ }
56
55
57
56
//Delete the consumer when it's INACTIVE and check
58
57
webTestClient
@@ -69,24 +68,24 @@ void shouldBeBadRequestWhenConsumerGroupIsActive() {
69
68
70
69
//Create a consumer and subscribe to the topic
71
70
String groupId = UUID .randomUUID ().toString ();
72
- val consumer = createTestConsumerWithGroupId (groupId );
73
- consumer .subscribe (List .of (topicName ));
74
- consumer .poll (Duration .ofMillis (100 ));
71
+ try ( val consumer = createTestConsumerWithGroupId (groupId )) {
72
+ consumer .subscribe (List .of (topicName ));
73
+ consumer .poll (Duration .ofMillis (100 ));
75
74
76
- //Try to delete the consumer when it's ACTIVE
77
- String expError = "The group is not empty" ;
78
- webTestClient
79
- . delete ( )
80
- . uri ( "/api/clusters/{clusterName}/consumer-groups/{groupId}" , LOCAL , groupId )
81
- . exchange ()
82
- . expectStatus ()
83
- . isBadRequest ();
75
+ //Try to delete the consumer when it's ACTIVE
76
+ webTestClient
77
+ . delete ()
78
+ . uri ( "/api/clusters/{clusterName}/consumer-groups/{groupId}" , LOCAL , groupId )
79
+ . exchange ( )
80
+ . expectStatus ()
81
+ . isBadRequest ();
82
+ }
84
83
}
85
84
86
85
@ Test
87
86
void shouldReturnConsumerGroupsWithPagination () throws Exception {
88
- try (var groups1 = startConsumerGroups (3 , "cgPageTest1" );
89
- var groups2 = startConsumerGroups (2 , "cgPageTest2" )) {
87
+ try (var ignored = startConsumerGroups (3 , "cgPageTest1" );
88
+ var ignored1 = startConsumerGroups (2 , "cgPageTest2" )) {
90
89
webTestClient
91
90
.get ()
92
91
.uri ("/api/clusters/{clusterName}/consumer-groups/paged?perPage=3&search=cgPageTest" , LOCAL )
@@ -114,19 +113,19 @@ void shouldReturnConsumerGroupsWithPagination() throws Exception {
114
113
});
115
114
116
115
webTestClient
117
- .get ()
118
- .uri ("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
119
- + "=cgPageTest&orderBy=NAME&sortOrder=DESC" , LOCAL )
120
- .exchange ()
121
- .expectStatus ()
122
- .isOk ()
123
- .expectBody (ConsumerGroupsPageResponseDTO .class )
124
- .value (page -> {
125
- assertThat (page .getPageCount ()).isEqualTo (1 );
126
- assertThat (page .getConsumerGroups ().size ()).isEqualTo (5 );
127
- assertThat (page .getConsumerGroups ())
128
- .isSortedAccordingTo (Comparator .comparing (ConsumerGroupDTO ::getGroupId ).reversed ());
129
- });
116
+ .get ()
117
+ .uri ("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
118
+ + "=cgPageTest&orderBy=NAME&sortOrder=DESC" , LOCAL )
119
+ .exchange ()
120
+ .expectStatus ()
121
+ .isOk ()
122
+ .expectBody (ConsumerGroupsPageResponseDTO .class )
123
+ .value (page -> {
124
+ assertThat (page .getPageCount ()).isEqualTo (1 );
125
+ assertThat (page .getConsumerGroups ().size ()).isEqualTo (5 );
126
+ assertThat (page .getConsumerGroups ())
127
+ .isSortedAccordingTo (Comparator .comparing (ConsumerGroupDTO ::getGroupId ).reversed ());
128
+ });
130
129
131
130
webTestClient
132
131
.get ()
@@ -156,7 +155,7 @@ private Closeable startConsumerGroups(int count, String consumerGroupPrefix) {
156
155
return consumer ;
157
156
})
158
157
.limit (count )
159
- .collect ( Collectors . toList () );
158
+ .toList ();
160
159
return () -> {
161
160
consumers .forEach (KafkaConsumer ::close );
162
161
deleteTopic (topicName );
0 commit comments