Skip to content

Commit f36c18d

Browse files
p-eyeHaarolean
andauthored
Consumers: Unsubscribe topics from consumer group (#549)
Co-authored-by: Roman Zabaluev <gpg@haarolean.dev>
1 parent fbef485 commit f36c18d

File tree

7 files changed

+208
-8
lines changed

7 files changed

+208
-8
lines changed

api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,24 @@ public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName,
5959
.thenReturn(ResponseEntity.ok().build());
6060
}
6161

62+
@Override
63+
public Mono<ResponseEntity<Void>> deleteConsumerGroupOffsets(String clusterName,
64+
String groupId,
65+
String topicName,
66+
ServerWebExchange exchange) {
67+
var context = AccessContext.builder()
68+
.cluster(clusterName)
69+
.consumerGroupActions(groupId, RESET_OFFSETS)
70+
.topicActions(topicName, TopicAction.VIEW)
71+
.operationName("deleteConsumerGroupOffsets")
72+
.build();
73+
74+
return validateAccess(context)
75+
.then(consumerGroupService.deleteConsumerGroupOffset(getCluster(clusterName), groupId, topicName))
76+
.doOnEach(sig -> audit(context, sig))
77+
.thenReturn(ResponseEntity.ok().build());
78+
}
79+
6280
@Override
6381
public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String clusterName,
6482
String consumerGroupId,

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,12 +209,13 @@ private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(ReactiveAdmi
209209
}
210210

211211

212-
private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(ReactiveAdminClient ac,
213-
List<ConsumerGroupListing> groups,
214-
Comparator<GroupWithDescr> comparator,
215-
int pageNum,
216-
int perPage,
217-
SortOrderDTO sortOrderDto) {
212+
private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(
213+
ReactiveAdminClient ac,
214+
List<ConsumerGroupListing> groups,
215+
Comparator<GroupWithDescr> comparator,
216+
int pageNum,
217+
int perPage,
218+
SortOrderDTO sortOrderDto) {
218219
var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
219220

220221
return ac.describeConsumerGroups(groupNames)
@@ -247,6 +248,13 @@ public Mono<Void> deleteConsumerGroupById(KafkaCluster cluster,
247248
.flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
248249
}
249250

251+
public Mono<Void> deleteConsumerGroupOffset(KafkaCluster cluster,
252+
String groupId,
253+
String topicName) {
254+
return adminClientService.get(cluster)
255+
.flatMap(adminClient -> adminClient.deleteConsumerGroupOffsets(groupId, topicName));
256+
}
257+
250258
public EnhancedConsumer createConsumer(KafkaCluster cluster) {
251259
return createConsumer(cluster, Map.of());
252260
}

api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.kafka.common.errors.ClusterAuthorizationException;
7575
import org.apache.kafka.common.errors.GroupIdNotFoundException;
7676
import org.apache.kafka.common.errors.GroupNotEmptyException;
77+
import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
7778
import org.apache.kafka.common.errors.InvalidRequestException;
7879
import org.apache.kafka.common.errors.SecurityDisabledException;
7980
import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -436,6 +437,27 @@ public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
436437
th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
437438
}
438439

440+
public Mono<Void> deleteConsumerGroupOffsets(String groupId, String topicName) {
441+
return listConsumerGroupOffsets(List.of(groupId), null)
442+
.flatMap(table -> {
443+
// filter TopicPartitions by topicName
444+
Set<TopicPartition> partitions = table.row(groupId).keySet().stream()
445+
.filter(tp -> tp.topic().equals(topicName))
446+
.collect(Collectors.toSet());
447+
// check if partitions have no committed offsets
448+
return partitions.isEmpty()
449+
? Mono.error(new NotFoundException("The topic or partition is unknown"))
450+
// call deleteConsumerGroupOffsets
451+
: toMono(client.deleteConsumerGroupOffsets(groupId, partitions).all());
452+
})
453+
.onErrorResume(GroupIdNotFoundException.class,
454+
th -> Mono.error(new NotFoundException("The group id does not exist")))
455+
.onErrorResume(UnknownTopicOrPartitionException.class,
456+
th -> Mono.error(new NotFoundException("The topic or partition is unknown")))
457+
.onErrorResume(GroupSubscribedToTopicException.class,
458+
th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
459+
}
460+
439461
public Mono<Void> createTopic(String name,
440462
int numPartitions,
441463
@Nullable Integer replicationFactor,

api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import io.kafbat.ui.model.ConsumerGroupDTO;
66
import io.kafbat.ui.model.ConsumerGroupsPageResponseDTO;
7+
import io.kafbat.ui.producer.KafkaTestProducer;
78
import java.io.Closeable;
89
import java.time.Duration;
910
import java.util.Comparator;
@@ -22,6 +23,8 @@
2223
import org.junit.jupiter.api.Test;
2324
import org.springframework.beans.factory.annotation.Autowired;
2425
import org.springframework.test.web.reactive.server.WebTestClient;
26+
import reactor.core.publisher.Flux;
27+
import reactor.core.publisher.Mono;
2528

2629
@Slf4j
2730
public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
@@ -31,12 +34,76 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
3134
@Test
3235
void shouldNotFoundWhenNoSuchConsumerGroupId() {
3336
String groupId = "groupA";
37+
String topicName = "topicX";
38+
3439
webTestClient
3540
.delete()
3641
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
3742
.exchange()
3843
.expectStatus()
3944
.isNotFound();
45+
46+
webTestClient
47+
.delete()
48+
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId, topicName)
49+
.exchange()
50+
.expectStatus()
51+
.isNotFound();
52+
}
53+
54+
@Test
55+
void shouldNotFoundWhenNoSuchTopic() {
56+
String topicName = createTopicWithRandomName();
57+
String topicNameUnSubscribed = "topicX";
58+
59+
//Create a consumer and subscribe to the topic
60+
String groupId = UUID.randomUUID().toString();
61+
try (val consumer = createTestConsumerWithGroupId(groupId)) {
62+
consumer.subscribe(List.of(topicName));
63+
consumer.poll(Duration.ofMillis(100));
64+
65+
webTestClient
66+
.delete()
67+
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId,
68+
topicNameUnSubscribed)
69+
.exchange()
70+
.expectStatus()
71+
.isNotFound();
72+
}
73+
}
74+
75+
@Test
76+
void shouldOkWhenConsumerGroupIsNotActiveAndPartitionOffsetExists() {
77+
String topicName = createTopicWithRandomName();
78+
79+
//Create a consumer and subscribe to the topic
80+
String groupId = UUID.randomUUID().toString();
81+
82+
try (KafkaTestProducer<String, String> producer = KafkaTestProducer.forKafka(kafka)) {
83+
Flux.fromStream(
84+
Stream.of("one", "two", "three", "four")
85+
.map(value -> Mono.fromFuture(producer.send(topicName, value)))
86+
).blockLast();
87+
} catch (Throwable e) {
88+
log.error("Error on sending", e);
89+
throw new RuntimeException(e);
90+
}
91+
92+
try (val consumer = createTestConsumerWithGroupId(groupId)) {
93+
consumer.subscribe(List.of(topicName));
94+
consumer.poll(Duration.ofMillis(100));
95+
96+
//Stop consumers to delete consumer offset from the topic
97+
consumer.pause(consumer.assignment());
98+
}
99+
100+
//Delete the consumer offset when it's INACTIVE and check
101+
webTestClient
102+
.delete()
103+
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId, topicName)
104+
.exchange()
105+
.expectStatus()
106+
.isOk();
40107
}
41108

42109
@Test

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,6 +1048,32 @@ paths:
10481048
200:
10491049
description: OK
10501050

1051+
/api/clusters/{clusterName}/consumer-groups/{id}/topics/{topicName}:
1052+
delete:
1053+
tags:
1054+
- Consumer Groups
1055+
summary: delete consumer group offsets
1056+
operationId: deleteConsumerGroupOffsets
1057+
parameters:
1058+
- name: clusterName
1059+
in: path
1060+
required: true
1061+
schema:
1062+
type: string
1063+
- name: id
1064+
in: path
1065+
required: true
1066+
schema:
1067+
type: string
1068+
- name: topicName
1069+
in: path
1070+
required: true
1071+
schema:
1072+
type: string
1073+
responses:
1074+
200:
1075+
description: OK
1076+
10511077
/api/clusters/{clusterName}/schemas:
10521078
post:
10531079
tags:

frontend/src/components/ConsumerGroups/Details/ListItem.tsx

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
import React from 'react';
2-
import { ConsumerGroupTopicPartition } from 'generated-sources';
2+
import {
3+
Action,
4+
ConsumerGroupTopicPartition,
5+
ResourceType,
6+
} from 'generated-sources';
37
import { Link } from 'react-router-dom';
48
import { ClusterName } from 'lib/interfaces/cluster';
5-
import { clusterTopicPath } from 'lib/paths';
9+
import { ClusterGroupParam, clusterTopicPath } from 'lib/paths';
10+
import { useDeleteConsumerGroupOffsetsMutation } from 'lib/hooks/api/consumers';
11+
import useAppParams from 'lib/hooks/useAppParams';
12+
import { Dropdown } from 'components/common/Dropdown';
13+
import { ActionDropdownItem } from 'components/common/ActionComponent';
614
import MessageToggleIcon from 'components/common/Icons/MessageToggleIcon';
715
import IconButtonWrapper from 'components/common/Icons/IconButtonWrapper';
816
import { TableKeyLink } from 'components/common/table/Table/TableKeyLink.styled';
@@ -18,6 +26,9 @@ interface Props {
1826

1927
const ListItem: React.FC<Props> = ({ clusterName, name, consumers }) => {
2028
const [isOpen, setIsOpen] = React.useState(false);
29+
const consumerProps = useAppParams<ClusterGroupParam>();
30+
const deleteOffsetMutation =
31+
useDeleteConsumerGroupOffsetsMutation(consumerProps);
2132

2233
const getTotalconsumerLag = () => {
2334
let count = 0;
@@ -27,6 +38,11 @@ const ListItem: React.FC<Props> = ({ clusterName, name, consumers }) => {
2738
return count;
2839
};
2940

41+
const deleteOffsetHandler = (topicName?: string) => {
42+
if (topicName === undefined) return;
43+
deleteOffsetMutation.mutateAsync(topicName);
44+
};
45+
3046
return (
3147
<>
3248
<tr>
@@ -41,6 +57,22 @@ const ListItem: React.FC<Props> = ({ clusterName, name, consumers }) => {
4157
</FlexWrapper>
4258
</td>
4359
<td>{getTotalconsumerLag()}</td>
60+
<td>
61+
<Dropdown>
62+
<ActionDropdownItem
63+
onClick={() => deleteOffsetHandler(name)}
64+
danger
65+
confirm="Are you sure you want to delete offsets from the topic?"
66+
permission={{
67+
resource: ResourceType.CONSUMER,
68+
action: Action.RESET_OFFSETS,
69+
value: consumerProps.consumerGroupID,
70+
}}
71+
>
72+
<span>Delete offsets</span>
73+
</ActionDropdownItem>
74+
</Dropdown>
75+
</td>
4476
</tr>
4577
{isOpen && <TopicContents consumers={consumers} />}
4678
</>

frontend/src/lib/hooks/api/consumers.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,30 @@ export const useResetConsumerGroupOffsetsMutation = ({
9090
}
9191
);
9292
};
93+
94+
export const useDeleteConsumerGroupOffsetsMutation = ({
95+
clusterName,
96+
consumerGroupID,
97+
}: UseConsumerGroupDetailsProps) => {
98+
const queryClient = useQueryClient();
99+
return useMutation(
100+
(topicName: string) =>
101+
api.deleteConsumerGroupOffsets({
102+
clusterName,
103+
id: consumerGroupID,
104+
topicName,
105+
}),
106+
{
107+
onSuccess: (_, topicName) => {
108+
showSuccessAlert({
109+
message: `Consumer ${consumerGroupID} group offsets in topic ${topicName} deleted`,
110+
});
111+
queryClient.invalidateQueries([
112+
'clusters',
113+
clusterName,
114+
'consumerGroups',
115+
]);
116+
},
117+
}
118+
);
119+
};

0 commit comments

Comments
 (0)