Skip to content

Commit bd4fb36

Browse files
committed
FE: Add Prevoius button to topic message page - backend side (kafbat#550)
1 parent ce7e27f commit bd4fb36

File tree

5 files changed

+54
-17
lines changed

5 files changed

+54
-17
lines changed

api/src/main/java/io/kafbat/ui/emitter/ConsumingStats.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@ void incFilterApplyError() {
2929
}
3030

3131
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
32+
String previousCursorId = cursor != null ? cursor.getPreviousCursorId() : null;
3233
sink.next(
3334
new TopicMessageEventDTO()
3435
.type(TopicMessageEventDTO.TypeEnum.DONE)
35-
.prevCursor( // FIXME
36-
null
36+
.prevCursor(
37+
previousCursorId != null
38+
? new TopicMessagePageCursorDTO().id(previousCursorId)
39+
: null
3740
)
3841
.nextCursor(
3942
cursor != null

api/src/main/java/io/kafbat/ui/emitter/Cursor.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import io.kafbat.ui.serdes.ConsumerRecordDeserializer;
99
import java.util.HashMap;
1010
import java.util.Map;
11+
import java.util.Optional;
12+
import java.util.function.BiFunction;
1113
import java.util.function.Function;
1214
import java.util.function.Predicate;
1315
import org.apache.kafka.common.TopicPartition;
@@ -22,7 +24,9 @@ public static class Tracking {
2224
private final ConsumerPosition originalPosition;
2325
private final Predicate<TopicMessageDTO> filter;
2426
private final int limit;
25-
private final Function<Cursor, String> registerAction;
27+
private final String cursorId;
28+
private final BiFunction<Cursor, String, String> registerAction;
29+
private final Function<String, Optional<String>> previousCursorIdGetter;
2630

2731
//topic -> partition -> offset
2832
private final Table<String, Integer, Long> trackingOffsets = HashBasedTable.create();
@@ -31,12 +35,16 @@ public Tracking(ConsumerRecordDeserializer deserializer,
3135
ConsumerPosition originalPosition,
3236
Predicate<TopicMessageDTO> filter,
3337
int limit,
34-
Function<Cursor, String> registerAction) {
38+
String cursorId,
39+
BiFunction<Cursor, String, String> registerAction,
40+
Function<String, Optional<String>> previousCursorIdGetter) {
3541
this.deserializer = deserializer;
3642
this.originalPosition = originalPosition;
3743
this.filter = filter;
3844
this.limit = limit;
45+
this.cursorId = cursorId;
3946
this.registerAction = registerAction;
47+
this.previousCursorIdGetter = previousCursorIdGetter;
4048
}
4149

4250
void trackOffset(String topic, int partition, long offset) {
@@ -82,9 +90,14 @@ String registerCursor() {
8290
),
8391
filter,
8492
limit
85-
)
93+
),
94+
this.cursorId
8695
);
8796
}
97+
98+
String getPreviousCursorId() {
99+
return this.previousCursorIdGetter.apply(this.cursorId).orElse(null);
100+
}
88101
}
89102

90103
}

api/src/main/java/io/kafbat/ui/service/MessagesService.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,8 @@ public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
222222
deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
223223
consumerPosition,
224224
getMsgFilter(containsStringFilter, filterId),
225-
fixPageSize(limit)
225+
fixPageSize(limit),
226+
null
226227
);
227228
}
228229

@@ -235,7 +236,8 @@ public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topi
235236
cursor.deserializer(),
236237
cursor.consumerPosition(),
237238
cursor.filter(),
238-
cursor.limit()
239+
cursor.limit(),
240+
cursorId
239241
);
240242
}
241243

@@ -244,18 +246,20 @@ private Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
244246
ConsumerRecordDeserializer deserializer,
245247
ConsumerPosition consumerPosition,
246248
Predicate<TopicMessageDTO> filter,
247-
int limit) {
249+
int limit,
250+
String cursorId) {
248251
return withExistingTopic(cluster, topic)
249252
.flux()
250253
.publishOn(Schedulers.boundedElastic())
251-
.flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit));
254+
.flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit, cursorId));
252255
}
253256

254257
private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
255258
ConsumerRecordDeserializer deserializer,
256259
ConsumerPosition consumerPosition,
257260
Predicate<TopicMessageDTO> filter,
258-
int limit) {
261+
int limit,
262+
String cursorId) {
259263
var emitter = switch (consumerPosition.pollingMode()) {
260264
case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardEmitter(
261265
() -> consumerGroupService.createConsumer(cluster),
@@ -264,7 +268,7 @@ private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
264268
deserializer,
265269
filter,
266270
cluster.getPollingSettings(),
267-
cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
271+
cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit, cursorId)
268272
);
269273
case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardEmitter(
270274
() -> consumerGroupService.createConsumer(cluster),
@@ -273,7 +277,7 @@ private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
273277
deserializer,
274278
filter,
275279
cluster.getPollingSettings(),
276-
cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
280+
cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit, cursorId)
277281
);
278282
case TAILING -> new TailingEmitter(
279283
() -> consumerGroupService.createConsumer(cluster),

api/src/main/java/io/kafbat/ui/service/PollingCursorsStorage.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.kafbat.ui.model.ConsumerPosition;
88
import io.kafbat.ui.model.TopicMessageDTO;
99
import io.kafbat.ui.serdes.ConsumerRecordDeserializer;
10+
import jakarta.annotation.Nullable;
1011
import java.util.Map;
1112
import java.util.Optional;
1213
import java.util.function.Predicate;
@@ -20,23 +21,39 @@ public class PollingCursorsStorage {
2021
.maximumSize(MAX_SIZE)
2122
.build();
2223

24+
private final Cache<String, String> previousCursorsMap = CacheBuilder.newBuilder()
25+
.maximumSize(MAX_SIZE)
26+
.build();
27+
2328
public Cursor.Tracking createNewCursor(ConsumerRecordDeserializer deserializer,
2429
ConsumerPosition originalPosition,
2530
Predicate<TopicMessageDTO> filter,
26-
int limit) {
27-
return new Cursor.Tracking(deserializer, originalPosition, filter, limit, this::register);
31+
int limit,
32+
@Nullable String cursorId) {
33+
return new Cursor.Tracking(deserializer, originalPosition, filter, limit, cursorId, this::register,
34+
this::getPreviousCursorId);
2835
}
2936

3037
public Optional<Cursor> getCursor(String id) {
3138
return Optional.ofNullable(cursorsCache.getIfPresent(id));
3239
}
3340

34-
public String register(Cursor cursor) {
41+
public String register(Cursor nextCursor, @Nullable String currentCursorId) {
3542
var id = RandomStringUtils.random(8, true, true);
36-
cursorsCache.put(id, cursor);
43+
cursorsCache.put(id, nextCursor);
44+
if (currentCursorId != null) {
45+
previousCursorsMap.put(id, currentCursorId);
46+
}
3747
return id;
3848
}
3949

50+
public Optional<String> getPreviousCursorId(@Nullable String cursorId) {
51+
if (cursorId == null) {
52+
return Optional.empty();
53+
}
54+
return Optional.ofNullable(previousCursorsMap.getIfPresent(cursorId));
55+
}
56+
4057
@VisibleForTesting
4158
public Map<String, Cursor> asMap() {
4259
return cursorsCache.asMap();

api/src/test/java/io/kafbat/ui/emitter/CursorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private ForwardEmitter createForwardEmitter(ConsumerPosition position) {
164164
}
165165

166166
private Cursor.Tracking createCursor(ConsumerPosition position) {
167-
return cursorsStorage.createNewCursor(createRecordsDeserializer(), position, m -> true, PAGE_SIZE);
167+
return cursorsStorage.createNewCursor(createRecordsDeserializer(), position, m -> true, PAGE_SIZE, null);
168168
}
169169

170170
private EnhancedConsumer createConsumer() {

0 commit comments

Comments
 (0)