Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion framework-platform/framework-platform.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies {
api(platform("com.fasterxml.jackson:jackson-bom:2.20.0"))
api(platform("io.micrometer:micrometer-bom:1.16.0-M3"))
api(platform("io.netty:netty-bom:4.2.6.Final"))
api(platform("io.projectreactor:reactor-bom:2025.0.0-M7"))
api(platform("io.projectreactor:reactor-bom:2025.0.0-SNAPSHOT"))
api(platform("io.rsocket:rsocket-bom:1.1.5"))
api(platform("org.apache.groovy:groovy-bom:5.0.0-rc-1"))
api(platform("org.apache.logging.log4j:log4j-bom:3.0.0-beta3"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -1170,7 +1171,7 @@ private class ReactiveCachingHandler {
if (invokeFailure.get()) {
return Mono.error(ex);
}
return (Mono) invokeOperation(invoker);
return (Mono) Objects.requireNonNull(invokeOperation(invoker));
}
catch (RuntimeException exception) {
return Mono.error(exception);
Expand Down Expand Up @@ -1201,8 +1202,8 @@ private class ReactiveCachingHandler {
}
if (adapter.isMultiValue()) {
return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture))
.switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts)))
.flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts))
.switchIfEmpty(Flux.defer(() -> (Flux) Objects.requireNonNull(evaluate(null, invoker, method, contexts))))
.flatMap(v -> Objects.requireNonNull(evaluate(valueToFlux(v, contexts), invoker, method, contexts)))
.onErrorResume(RuntimeException.class, ex -> {
try {
getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key);
Expand All @@ -1216,8 +1217,11 @@ private class ReactiveCachingHandler {
}
else {
return adapter.fromPublisher(Mono.fromFuture(cachedFuture)
.switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts)))
.flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts))
// TODO: requireNonNull is not for
// free, perhaps we should
// suppress it
.switchIfEmpty(Mono.defer(() -> (Mono) Objects.requireNonNull(evaluate(null, invoker, method, contexts))))
.flatMap(v -> Objects.requireNonNull(evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts)))
.onErrorResume(RuntimeException.class, ex -> {
try {
getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.core.codec;

import java.util.Map;
import java.util.function.Function;

import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -84,15 +85,18 @@ public int getMaxInMemorySize() {
public Flux<T> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

return Flux.from(input).map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
Function<DataBuffer, @Nullable T> decodeBufferFn =
buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints);
return Flux.from(input).map(decodeBufferFn);
}

@Override
public Mono<T> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

return DataBufferUtils.join(input, this.maxInMemorySize)
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
Function<DataBuffer, @Nullable T> decodeBufferFn =
buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints);
return DataBufferUtils.join(input, this.maxInMemorySize).map(decodeBufferFn);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -227,7 +228,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
if (adapter != null && adapter.isMultiValue()) {
Flux<?> flux = content
.filter(this::nonEmptyDataBuffer)
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.map(buffer -> Objects.requireNonNull(decoder.decode(buffer, elementType, mimeType, hints)))
.onErrorMap(ex -> handleReadError(parameter, message, ex));
if (isContentRequired) {
flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message)));
Expand All @@ -241,7 +242,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
// Single-value (with or without reactive type wrapper)
Mono<?> mono = content.next()
.filter(this::nonEmptyDataBuffer)
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.map(buffer -> Objects.requireNonNull(decoder.decode(buffer, elementType, mimeType, hints)))
.onErrorMap(ex -> handleReadError(parameter, message, ex));
if (isContentRequired) {
mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

import io.rsocket.Payload;
Expand Down Expand Up @@ -291,7 +292,7 @@ private <T> Mono<T> retrieveMono(ResolvableType elementType) {

Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
return (Mono<T>) payloadMono.map(this::retainDataAndReleasePayload)
.map(dataBuffer -> decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
.map(dataBuffer -> Objects.requireNonNull(decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS)));
}

@Override
Expand All @@ -317,7 +318,7 @@ private <T> Flux<T> retrieveFlux(ResolvableType elementType) {

Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
return payloadFlux.map(this::retainDataAndReleasePayload).map(dataBuffer ->
(T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
(T) Objects.requireNonNull(decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS)));
}

private Mono<Payload> getPayloadMono() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import io.r2dbc.spi.Connection;
Expand Down Expand Up @@ -176,7 +177,7 @@ public Mono<? extends Connection> create() {
this.connection =
(isSuppressClose() ? getCloseSuppressingConnectionProxy(connectionToUse) : connectionToUse);
}
return this.connection;
return Objects.requireNonNull(this.connection);
}).flatMap(this::prepareConnection);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.transaction.interceptor;

import java.lang.reflect.Method;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -903,7 +904,7 @@ public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetCl
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification),
tx -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
return (Mono<?>) Objects.requireNonNull(invocation.proceedWithInvocation());
}
catch (Throwable ex) {
return Mono.error(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ abstract class AbstractCoroutinesTransactionAspectTests {

private fun checkReactiveTransaction(expected: Boolean) {
Mono.deferContextual{context -> Mono.just(context)}
.handle { context: ContextView, sink: SynchronousSink<Any?> ->
.handle { context: ContextView, sink: SynchronousSink<ContextView> ->
if (context.hasKey(TransactionContext::class.java) != expected) {
Fail.fail<Any>("Should have thrown NoTransactionException")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
return requestSender
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((response, connection) -> {
responseRef.set(new ReactorClientHttpResponse(response, connection));
return Mono.just((ClientHttpResponse) responseRef.get());
ReactorClientHttpResponse clientResponse = new ReactorClientHttpResponse(response, connection);
responseRef.set(clientResponse);
return Mono.just((ClientHttpResponse) clientResponse);
})
.next()
.doOnCancel(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ class KotlinSerializationCborEncoderTests : AbstractEncoderTests<KotlinSerializa
pojo3
)
val pojoBytes = Cbor.Default.encodeToByteArray(arrayOf(pojo1, pojo2, pojo3))
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer?> ->
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer> ->
step
.consumeNextWith(expectBytes(pojoBytes)
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
.andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) })
.verifyComplete()
}
}
Expand All @@ -78,7 +78,7 @@ class KotlinSerializationCborEncoderTests : AbstractEncoderTests<KotlinSerializa
fun encodeMono() {
val pojo = Pojo("foo", "bar")
val input = Mono.just(pojo)
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer?> ->
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer> ->
step
.consumeNextWith(expectBytes(Cbor.Default.encodeToByteArray(pojo))
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ class CustomKotlinSerializationJsonEncoderTests :
@Test
override fun encode() {
val input = Mono.just(BigDecimal(1))
testEncode(input, BigDecimal::class.java) { step: StepVerifier.FirstStep<DataBuffer?> ->
testEncode(input, BigDecimal::class.java) { step: StepVerifier.FirstStep<DataBuffer> ->
step.consumeNextWith(expectString("1.0")
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
.andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) })
.verifyComplete()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ class KotlinSerializationProtobufEncoderTests : AbstractEncoderTests<KotlinSeria
pojo3
)
val pojoBytes = ProtoBuf.Default.encodeToByteArray(arrayOf(pojo1, pojo2, pojo3))
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer?> ->
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer> ->
step
.consumeNextWith(expectBytes(pojoBytes)
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
.andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) })
.verifyComplete()
}
}
Expand All @@ -88,10 +88,10 @@ class KotlinSerializationProtobufEncoderTests : AbstractEncoderTests<KotlinSeria
fun encodeMono() {
val pojo = Pojo("foo", "bar")
val input = Mono.just(pojo)
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer?> ->
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer> ->
step
.consumeNextWith(expectBytes(ProtoBuf.Default.encodeToByteArray(pojo))
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
.andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) })
.verifyComplete()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package org.springframework.web.reactive.result.method.annotation;

import java.util.function.Function;

import org.jspecify.annotations.Nullable;
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
Expand All @@ -26,6 +29,7 @@
import org.springframework.web.bind.annotation.ValueConstants;
import org.springframework.web.server.MissingRequestValueException;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebSession;

/**
* Resolves method arguments annotated with an @{@link SessionAttribute}.
Expand Down Expand Up @@ -55,9 +59,8 @@ protected NamedValueInfo createNamedValueInfo(MethodParameter parameter) {

@Override
protected Mono<Object> resolveName(String name, MethodParameter parameter, ServerWebExchange exchange) {
return exchange.getSession()
.filter(session -> session.getAttribute(name) != null)
.map(session -> session.getAttribute(name));
Function<WebSession, @Nullable Object> extractAttributeFn = session -> session.getAttribute(name);
return exchange.getSession().mapNotNull(extractAttributeFn);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,8 @@ class CoRouterFunctionDsl internal constructor (private val init: (CoRouterFunct
}

@PublishedApi
internal fun <T> asMono(request: ServerRequest, context: CoroutineContext = Dispatchers.Unconfined, handler: suspend (ServerRequest) -> T): Mono<T> {
internal fun <T : Any> asMono(request: ServerRequest, context: CoroutineContext =
Dispatchers.Unconfined, handler: suspend (ServerRequest) -> T?): Mono<T> {
return mono(context) {
contextProvider?.let {
withContext(it.invoke(request)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ class InvocableHandlerMethodKotlinTests {
StepVerifier.create(mono)
.consumeNextWith {
if (it.returnValue is Mono<*>) {
StepVerifier.create(it.returnValue as Mono<*>).expectNext(expected).verifyComplete()
StepVerifier.create(it.returnValue as Mono<*>).expectNext(requireNotNull(expected))
.verifyComplete()
} else {
assertThat(it.returnValue).isEqualTo(expected)
}
Expand Down