diff --git a/framework-platform/framework-platform.gradle b/framework-platform/framework-platform.gradle index 56061d0e40fc..e724dc3a4707 100644 --- a/framework-platform/framework-platform.gradle +++ b/framework-platform/framework-platform.gradle @@ -10,7 +10,7 @@ dependencies { api(platform("com.fasterxml.jackson:jackson-bom:2.20.0")) api(platform("io.micrometer:micrometer-bom:1.16.0-M3")) api(platform("io.netty:netty-bom:4.2.6.Final")) - api(platform("io.projectreactor:reactor-bom:2025.0.0-M7")) + api(platform("io.projectreactor:reactor-bom:2025.0.0-SNAPSHOT")) api(platform("io.rsocket:rsocket-bom:1.1.5")) api(platform("org.apache.groovy:groovy-bom:5.0.0-rc-1")) api(platform("org.apache.logging.log4j:log4j-bom:3.0.0-beta3")) diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java index e72ced10c1ed..85efbcef6124 100644 --- a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java +++ b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -1170,7 +1171,7 @@ private class ReactiveCachingHandler { if (invokeFailure.get()) { return Mono.error(ex); } - return (Mono) invokeOperation(invoker); + return (Mono) Objects.requireNonNull(invokeOperation(invoker)); } catch (RuntimeException exception) { return Mono.error(exception); @@ -1201,8 +1202,8 @@ private class ReactiveCachingHandler { } if (adapter.isMultiValue()) { return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture)) - .switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts))) - .flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts)) + .switchIfEmpty(Flux.defer(() -> (Flux) Objects.requireNonNull(evaluate(null, invoker, method, contexts)))) + .flatMap(v -> Objects.requireNonNull(evaluate(valueToFlux(v, contexts), invoker, method, contexts))) .onErrorResume(RuntimeException.class, ex -> { try { getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); @@ -1216,8 +1217,11 @@ private class ReactiveCachingHandler { } else { return adapter.fromPublisher(Mono.fromFuture(cachedFuture) - .switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts))) - .flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts)) + // TODO: requireNonNull is not for + // free, perhaps we should + // suppress it + .switchIfEmpty(Mono.defer(() -> (Mono) Objects.requireNonNull(evaluate(null, invoker, method, contexts)))) + .flatMap(v -> Objects.requireNonNull(evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts))) .onErrorResume(RuntimeException.class, ex -> { try { getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); diff --git a/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java index 78cbbf828500..426da45ea296 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java @@ -17,6 +17,7 @@ package org.springframework.core.codec; import java.util.Map; +import java.util.Objects; import org.jspecify.annotations.Nullable; import org.reactivestreams.Publisher; @@ -84,15 +85,16 @@ public int getMaxInMemorySize() { public Flux decode(Publisher input, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return Flux.from(input).map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints)); + return Flux.from(input).map(buffer -> + Objects.requireNonNull(decodeDataBuffer(buffer, elementType, mimeType, hints))); } @Override public Mono decodeToMono(Publisher input, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return DataBufferUtils.join(input, this.maxInMemorySize) - .map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints)); + return DataBufferUtils.join(input, this.maxInMemorySize).map(buffer -> + Objects.requireNonNull(decodeDataBuffer(buffer, elementType, mimeType, hints))); } /** diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java index a368b00eaa59..f877b35b149e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Consumer; import org.apache.commons.logging.Log; @@ -227,7 +228,7 @@ private Mono decodeContent(MethodParameter parameter, Message message if (adapter != null && adapter.isMultiValue()) { Flux flux = content .filter(this::nonEmptyDataBuffer) - .map(buffer -> decoder.decode(buffer, elementType, mimeType, hints)) + .map(buffer -> Objects.requireNonNull(decoder.decode(buffer, elementType, mimeType, hints))) .onErrorMap(ex -> handleReadError(parameter, message, ex)); if (isContentRequired) { flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message))); @@ -241,7 +242,7 @@ private Mono decodeContent(MethodParameter parameter, Message message // Single-value (with or without reactive type wrapper) Mono mono = content.next() .filter(this::nonEmptyDataBuffer) - .map(buffer -> decoder.decode(buffer, elementType, mimeType, hints)) + .map(buffer -> Objects.requireNonNull(decoder.decode(buffer, elementType, mimeType, hints))) .onErrorMap(ex -> handleReadError(parameter, message, ex)); if (isContentRequired) { mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message))); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java index 5d4435f078fc..42289ae3e5eb 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java @@ -18,6 +18,7 @@ import java.util.Collections; import java.util.Map; +import java.util.Objects; import java.util.function.Consumer; import io.rsocket.Payload; @@ -291,7 +292,7 @@ private Mono retrieveMono(ResolvableType elementType) { Decoder decoder = strategies.decoder(elementType, dataMimeType); return (Mono) payloadMono.map(this::retainDataAndReleasePayload) - .map(dataBuffer -> decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS)); + .map(dataBuffer -> Objects.requireNonNull(decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS))); } @Override @@ -317,7 +318,7 @@ private Flux retrieveFlux(ResolvableType elementType) { Decoder decoder = strategies.decoder(elementType, dataMimeType); return payloadFlux.map(this::retainDataAndReleasePayload).map(dataBuffer -> - (T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS)); + (T) Objects.requireNonNull(decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS))); } private Mono getPayloadMono() { diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/SingleConnectionFactory.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/SingleConnectionFactory.java index 5e079928acc3..2a6658c870db 100644 --- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/SingleConnectionFactory.java +++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/SingleConnectionFactory.java @@ -20,6 +20,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import io.r2dbc.spi.Connection; @@ -176,7 +177,7 @@ public Mono create() { this.connection = (isSuppressClose() ? getCloseSuppressingConnectionProxy(connectionToUse) : connectionToUse); } - return this.connection; + return Objects.requireNonNull(this.connection); }).flatMap(this::prepareConnection); } diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index 02a585686dea..ce76c08d90f5 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -17,6 +17,7 @@ package org.springframework.transaction.interceptor; import java.lang.reflect.Method; +import java.util.Objects; import java.util.Properties; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -903,7 +904,7 @@ public Object invokeWithinTransaction(Method method, @Nullable Class targetCl createTransactionIfNecessary(rtm, txAttr, joinpointIdentification), tx -> { try { - return (Mono) invocation.proceedWithInvocation(); + return (Mono) Objects.requireNonNull(invocation.proceedWithInvocation()); } catch (Throwable ex) { return Mono.error(ex); diff --git a/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt b/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt index 9383ec677fdf..a418692ae8bd 100644 --- a/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt +++ b/spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt @@ -301,7 +301,7 @@ abstract class AbstractCoroutinesTransactionAspectTests { private fun checkReactiveTransaction(expected: Boolean) { Mono.deferContextual{context -> Mono.just(context)} - .handle { context: ContextView, sink: SynchronousSink -> + .handle { context: ContextView, sink: SynchronousSink -> if (context.hasKey(TransactionContext::class.java) != expected) { Fail.fail("Should have thrown NoTransactionException") } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 09d6bbb8bc83..0c8eb18cf439 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -165,8 +165,9 @@ public Mono connect(HttpMethod method, URI uri, return requestSender .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) .responseConnection((response, connection) -> { - responseRef.set(new ReactorClientHttpResponse(response, connection)); - return Mono.just((ClientHttpResponse) responseRef.get()); + ReactorClientHttpResponse clientResponse = new ReactorClientHttpResponse(response, connection); + responseRef.set(clientResponse); + return Mono.just((ClientHttpResponse) clientResponse); }) .next() .doOnCancel(() -> { diff --git a/spring-web/src/test/kotlin/org/springframework/http/codec/cbor/KotlinSerializationCborEncoderTests.kt b/spring-web/src/test/kotlin/org/springframework/http/codec/cbor/KotlinSerializationCborEncoderTests.kt index 7300eb69a286..cc99d3003f79 100644 --- a/spring-web/src/test/kotlin/org/springframework/http/codec/cbor/KotlinSerializationCborEncoderTests.kt +++ b/spring-web/src/test/kotlin/org/springframework/http/codec/cbor/KotlinSerializationCborEncoderTests.kt @@ -66,10 +66,10 @@ class KotlinSerializationCborEncoderTests : AbstractEncoderTests -> + testEncode(input, Pojo::class.java) { step: FirstStep -> step .consumeNextWith(expectBytes(pojoBytes) - .andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) }) + .andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) }) .verifyComplete() } } @@ -78,7 +78,7 @@ class KotlinSerializationCborEncoderTests : AbstractEncoderTests -> + testEncode(input, Pojo::class.java) { step: FirstStep -> step .consumeNextWith(expectBytes(Cbor.Default.encodeToByteArray(pojo)) .andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) }) diff --git a/spring-web/src/test/kotlin/org/springframework/http/codec/json/CustomKotlinSerializationJsonEncoderTests.kt b/spring-web/src/test/kotlin/org/springframework/http/codec/json/CustomKotlinSerializationJsonEncoderTests.kt index a8bbe6d9c2f0..037dab3b1e96 100644 --- a/spring-web/src/test/kotlin/org/springframework/http/codec/json/CustomKotlinSerializationJsonEncoderTests.kt +++ b/spring-web/src/test/kotlin/org/springframework/http/codec/json/CustomKotlinSerializationJsonEncoderTests.kt @@ -45,9 +45,9 @@ class CustomKotlinSerializationJsonEncoderTests : @Test override fun encode() { val input = Mono.just(BigDecimal(1)) - testEncode(input, BigDecimal::class.java) { step: StepVerifier.FirstStep -> + testEncode(input, BigDecimal::class.java) { step: StepVerifier.FirstStep -> step.consumeNextWith(expectString("1.0") - .andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) }) + .andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) }) .verifyComplete() } } diff --git a/spring-web/src/test/kotlin/org/springframework/http/codec/protobuf/KotlinSerializationProtobufEncoderTests.kt b/spring-web/src/test/kotlin/org/springframework/http/codec/protobuf/KotlinSerializationProtobufEncoderTests.kt index 800be3cfd299..36a808596029 100644 --- a/spring-web/src/test/kotlin/org/springframework/http/codec/protobuf/KotlinSerializationProtobufEncoderTests.kt +++ b/spring-web/src/test/kotlin/org/springframework/http/codec/protobuf/KotlinSerializationProtobufEncoderTests.kt @@ -76,10 +76,10 @@ class KotlinSerializationProtobufEncoderTests : AbstractEncoderTests -> + testEncode(input, Pojo::class.java) { step: FirstStep -> step .consumeNextWith(expectBytes(pojoBytes) - .andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) }) + .andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) }) .verifyComplete() } } @@ -88,10 +88,10 @@ class KotlinSerializationProtobufEncoderTests : AbstractEncoderTests -> + testEncode(input, Pojo::class.java) { step: FirstStep -> step .consumeNextWith(expectBytes(ProtoBuf.Default.encodeToByteArray(pojo)) - .andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) }) + .andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) }) .verifyComplete() } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/SessionAttributeMethodArgumentResolver.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/SessionAttributeMethodArgumentResolver.java index ba6b271007a4..bc2d86fa4d79 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/SessionAttributeMethodArgumentResolver.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/SessionAttributeMethodArgumentResolver.java @@ -53,11 +53,10 @@ protected NamedValueInfo createNamedValueInfo(MethodParameter parameter) { return new NamedValueInfo(ann.name(), ann.required(), ValueConstants.DEFAULT_NONE); } + @SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290 @Override protected Mono resolveName(String name, MethodParameter parameter, ServerWebExchange exchange) { - return exchange.getSession() - .filter(session -> session.getAttribute(name) != null) - .map(session -> session.getAttribute(name)); + return exchange.getSession().mapNotNull(session -> session.getAttribute(name)); } @Override diff --git a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/CoRouterFunctionDsl.kt b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/CoRouterFunctionDsl.kt index f022b2b3daf3..2ce1d0763470 100644 --- a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/CoRouterFunctionDsl.kt +++ b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/CoRouterFunctionDsl.kt @@ -739,7 +739,8 @@ class CoRouterFunctionDsl internal constructor (private val init: (CoRouterFunct } @PublishedApi - internal fun asMono(request: ServerRequest, context: CoroutineContext = Dispatchers.Unconfined, handler: suspend (ServerRequest) -> T): Mono { + internal fun asMono(request: ServerRequest, context: CoroutineContext = + Dispatchers.Unconfined, handler: suspend (ServerRequest) -> T?): Mono { return mono(context) { contextProvider?.let { withContext(it.invoke(request)) { diff --git a/spring-webflux/src/test/kotlin/org/springframework/web/reactive/result/InvocableHandlerMethodKotlinTests.kt b/spring-webflux/src/test/kotlin/org/springframework/web/reactive/result/InvocableHandlerMethodKotlinTests.kt index 2556e7b196bd..95a5df9cbbcf 100644 --- a/spring-webflux/src/test/kotlin/org/springframework/web/reactive/result/InvocableHandlerMethodKotlinTests.kt +++ b/spring-webflux/src/test/kotlin/org/springframework/web/reactive/result/InvocableHandlerMethodKotlinTests.kt @@ -381,7 +381,8 @@ class InvocableHandlerMethodKotlinTests { StepVerifier.create(mono) .consumeNextWith { if (it.returnValue is Mono<*>) { - StepVerifier.create(it.returnValue as Mono<*>).expectNext(expected).verifyComplete() + StepVerifier.create(it.returnValue as Mono<*>).expectNext(requireNotNull(expected)) + .verifyComplete() } else { assertThat(it.returnValue).isEqualTo(expected) }