|
46 | 46 | import org.apache.kafka.clients.producer.RecordMetadata;
|
47 | 47 | import org.apache.kafka.common.TopicPartition;
|
48 | 48 | import org.apache.kafka.common.serialization.ByteArraySerializer;
|
| 49 | +import org.jetbrains.annotations.NotNull; |
49 | 50 | import org.springframework.stereotype.Service;
|
50 | 51 | import reactor.core.publisher.Flux;
|
51 | 52 | import reactor.core.publisher.Mono;
|
@@ -216,42 +217,29 @@ public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
|
216 | 217 | @Nullable Integer limit,
|
217 | 218 | @Nullable String keySerde,
|
218 | 219 | @Nullable String valueSerde) {
|
219 |
| - return loadMessages( |
220 |
| - cluster, |
221 |
| - topic, |
| 220 | + Cursor cursor = new Cursor( |
222 | 221 | deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
|
223 | 222 | consumerPosition,
|
224 | 223 | getMsgFilter(containsStringFilter, filterId),
|
225 |
| - fixPageSize(limit), |
226 |
| - null |
| 224 | + fixPageSize(limit) |
227 | 225 | );
|
| 226 | + String cursorId = cursorsStorage.register(cursor, null); |
| 227 | + return loadMessages(cluster, topic, cursorId, cursor); |
228 | 228 | }
|
229 | 229 |
|
230 | 230 | public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic, String cursorId) {
|
231 | 231 | Cursor cursor = cursorsStorage.getCursor(cursorId)
|
232 | 232 | .orElseThrow(() -> new ValidationException("Next page cursor not found. Maybe it was evicted from cache."));
|
233 |
| - return loadMessages( |
234 |
| - cluster, |
235 |
| - topic, |
236 |
| - cursor.deserializer(), |
237 |
| - cursor.consumerPosition(), |
238 |
| - cursor.filter(), |
239 |
| - cursor.limit(), |
240 |
| - cursorId |
241 |
| - ); |
| 233 | + return loadMessages(cluster, topic, cursorId, cursor); |
242 | 234 | }
|
243 | 235 |
|
244 |
| - private Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, |
245 |
| - String topic, |
246 |
| - ConsumerRecordDeserializer deserializer, |
247 |
| - ConsumerPosition consumerPosition, |
248 |
| - Predicate<TopicMessageDTO> filter, |
249 |
| - int limit, |
250 |
| - String cursorId) { |
| 236 | + private @NotNull Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic, |
| 237 | + String cursorId, Cursor cursor) { |
251 | 238 | return withExistingTopic(cluster, topic)
|
252 | 239 | .flux()
|
253 | 240 | .publishOn(Schedulers.boundedElastic())
|
254 |
| - .flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit, cursorId)); |
| 241 | + .flatMap(td -> loadMessagesImpl(cluster, |
| 242 | + cursor.deserializer(), cursor.consumerPosition(), cursor.filter(), cursor.limit(), cursorId)); |
255 | 243 | }
|
256 | 244 |
|
257 | 245 | private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
|
|
0 commit comments