Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion framework-platform/framework-platform.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies {
api(platform("com.fasterxml.jackson:jackson-bom:2.20.0"))
api(platform("io.micrometer:micrometer-bom:1.16.0-M3"))
api(platform("io.netty:netty-bom:4.2.6.Final"))
api(platform("io.projectreactor:reactor-bom:2025.0.0-M7"))
api(platform("io.projectreactor:reactor-bom:2025.0.0-SNAPSHOT"))
api(platform("io.rsocket:rsocket-bom:1.1.5"))
api(platform("org.apache.groovy:groovy-bom:5.0.0-rc-1"))
api(platform("org.apache.logging.log4j:log4j-bom:3.0.0-beta3"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -1170,7 +1171,7 @@ private class ReactiveCachingHandler {
if (invokeFailure.get()) {
return Mono.error(ex);
}
return (Mono) invokeOperation(invoker);
return (Mono) Objects.requireNonNull(invokeOperation(invoker));
}
catch (RuntimeException exception) {
return Mono.error(exception);
Expand Down Expand Up @@ -1201,8 +1202,8 @@ private class ReactiveCachingHandler {
}
if (adapter.isMultiValue()) {
return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture))
.switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts)))
.flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts))
.switchIfEmpty(Flux.defer(() -> (Flux) Objects.requireNonNull(evaluate(null, invoker, method, contexts))))
.flatMap(v -> Objects.requireNonNull(evaluate(valueToFlux(v, contexts), invoker, method, contexts)))
.onErrorResume(RuntimeException.class, ex -> {
try {
getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key);
Expand All @@ -1216,8 +1217,11 @@ private class ReactiveCachingHandler {
}
else {
return adapter.fromPublisher(Mono.fromFuture(cachedFuture)
.switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts)))
.flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts))
// TODO: requireNonNull is not for
// free, perhaps we should
// suppress it
.switchIfEmpty(Mono.defer(() -> (Mono) Objects.requireNonNull(evaluate(null, invoker, method, contexts))))
.flatMap(v -> Objects.requireNonNull(evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts)))
.onErrorResume(RuntimeException.class, ex -> {
try {
getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.core.codec;

import java.util.Map;
import java.util.Objects;

import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -84,15 +85,16 @@ public int getMaxInMemorySize() {
public Flux<T> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

return Flux.from(input).map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
return Flux.from(input).map(buffer ->
Objects.requireNonNull(decodeDataBuffer(buffer, elementType, mimeType, hints)));
}

@Override
public Mono<T> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

return DataBufferUtils.join(input, this.maxInMemorySize)
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
return DataBufferUtils.join(input, this.maxInMemorySize).map(buffer ->
Objects.requireNonNull(decodeDataBuffer(buffer, elementType, mimeType, hints)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -227,7 +228,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
if (adapter != null && adapter.isMultiValue()) {
Flux<?> flux = content
.filter(this::nonEmptyDataBuffer)
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.map(buffer -> Objects.requireNonNull(decoder.decode(buffer, elementType, mimeType, hints)))
.onErrorMap(ex -> handleReadError(parameter, message, ex));
if (isContentRequired) {
flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message)));
Expand All @@ -241,7 +242,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
// Single-value (with or without reactive type wrapper)
Mono<?> mono = content.next()
.filter(this::nonEmptyDataBuffer)
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.map(buffer -> Objects.requireNonNull(decoder.decode(buffer, elementType, mimeType, hints)))
.onErrorMap(ex -> handleReadError(parameter, message, ex));
if (isContentRequired) {
mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

import io.rsocket.Payload;
Expand Down Expand Up @@ -291,7 +292,7 @@ private <T> Mono<T> retrieveMono(ResolvableType elementType) {

Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
return (Mono<T>) payloadMono.map(this::retainDataAndReleasePayload)
.map(dataBuffer -> decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
.map(dataBuffer -> Objects.requireNonNull(decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS)));
}

@Override
Expand All @@ -317,7 +318,7 @@ private <T> Flux<T> retrieveFlux(ResolvableType elementType) {

Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
return payloadFlux.map(this::retainDataAndReleasePayload).map(dataBuffer ->
(T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
(T) Objects.requireNonNull(decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS)));
}

private Mono<Payload> getPayloadMono() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import io.r2dbc.spi.Connection;
Expand Down Expand Up @@ -176,7 +177,7 @@ public Mono<? extends Connection> create() {
this.connection =
(isSuppressClose() ? getCloseSuppressingConnectionProxy(connectionToUse) : connectionToUse);
}
return this.connection;
return Objects.requireNonNull(this.connection);
}).flatMap(this::prepareConnection);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.transaction.interceptor;

import java.lang.reflect.Method;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -903,7 +904,7 @@ public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetCl
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification),
tx -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
return (Mono<?>) Objects.requireNonNull(invocation.proceedWithInvocation());
}
catch (Throwable ex) {
return Mono.error(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ abstract class AbstractCoroutinesTransactionAspectTests {

private fun checkReactiveTransaction(expected: Boolean) {
Mono.deferContextual{context -> Mono.just(context)}
.handle { context: ContextView, sink: SynchronousSink<Any?> ->
.handle { context: ContextView, sink: SynchronousSink<ContextView> ->
if (context.hasKey(TransactionContext::class.java) != expected) {
Fail.fail<Any>("Should have thrown NoTransactionException")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
return requestSender
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((response, connection) -> {
responseRef.set(new ReactorClientHttpResponse(response, connection));
return Mono.just((ClientHttpResponse) responseRef.get());
ReactorClientHttpResponse clientResponse = new ReactorClientHttpResponse(response, connection);
responseRef.set(clientResponse);
return Mono.just((ClientHttpResponse) clientResponse);
})
.next()
.doOnCancel(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ class KotlinSerializationCborEncoderTests : AbstractEncoderTests<KotlinSerializa
pojo3
)
val pojoBytes = Cbor.Default.encodeToByteArray(arrayOf(pojo1, pojo2, pojo3))
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer?> ->
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer> ->
step
.consumeNextWith(expectBytes(pojoBytes)
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
.andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) })
.verifyComplete()
}
}
Expand All @@ -78,7 +78,7 @@ class KotlinSerializationCborEncoderTests : AbstractEncoderTests<KotlinSerializa
fun encodeMono() {
val pojo = Pojo("foo", "bar")
val input = Mono.just(pojo)
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer?> ->
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer> ->
step
.consumeNextWith(expectBytes(Cbor.Default.encodeToByteArray(pojo))
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ class CustomKotlinSerializationJsonEncoderTests :
@Test
override fun encode() {
val input = Mono.just(BigDecimal(1))
testEncode(input, BigDecimal::class.java) { step: StepVerifier.FirstStep<DataBuffer?> ->
testEncode(input, BigDecimal::class.java) { step: StepVerifier.FirstStep<DataBuffer> ->
step.consumeNextWith(expectString("1.0")
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
.andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) })
.verifyComplete()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ class KotlinSerializationProtobufEncoderTests : AbstractEncoderTests<KotlinSeria
pojo3
)
val pojoBytes = ProtoBuf.Default.encodeToByteArray(arrayOf(pojo1, pojo2, pojo3))
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer?> ->
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer> ->
step
.consumeNextWith(expectBytes(pojoBytes)
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
.andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) })
.verifyComplete()
}
}
Expand All @@ -88,10 +88,10 @@ class KotlinSerializationProtobufEncoderTests : AbstractEncoderTests<KotlinSeria
fun encodeMono() {
val pojo = Pojo("foo", "bar")
val input = Mono.just(pojo)
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer?> ->
testEncode(input, Pojo::class.java) { step: FirstStep<DataBuffer> ->
step
.consumeNextWith(expectBytes(ProtoBuf.Default.encodeToByteArray(pojo))
.andThen { dataBuffer: DataBuffer? -> DataBufferUtils.release(dataBuffer) })
.andThen { dataBuffer: DataBuffer -> DataBufferUtils.release(dataBuffer) })
.verifyComplete()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ protected NamedValueInfo createNamedValueInfo(MethodParameter parameter) {
return new NamedValueInfo(ann.name(), ann.required(), ValueConstants.DEFAULT_NONE);
}

@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
@Override
protected Mono<Object> resolveName(String name, MethodParameter parameter, ServerWebExchange exchange) {
return exchange.getSession()
.filter(session -> session.getAttribute(name) != null)
.map(session -> session.getAttribute(name));
return exchange.getSession().mapNotNull(session -> session.getAttribute(name));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,8 @@ class CoRouterFunctionDsl internal constructor (private val init: (CoRouterFunct
}

@PublishedApi
internal fun <T> asMono(request: ServerRequest, context: CoroutineContext = Dispatchers.Unconfined, handler: suspend (ServerRequest) -> T): Mono<T> {
internal fun <T : Any> asMono(request: ServerRequest, context: CoroutineContext =
Dispatchers.Unconfined, handler: suspend (ServerRequest) -> T?): Mono<T> {
return mono(context) {
contextProvider?.let {
withContext(it.invoke(request)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ class InvocableHandlerMethodKotlinTests {
StepVerifier.create(mono)
.consumeNextWith {
if (it.returnValue is Mono<*>) {
StepVerifier.create(it.returnValue as Mono<*>).expectNext(expected).verifyComplete()
StepVerifier.create(it.returnValue as Mono<*>).expectNext(requireNotNull(expected))
.verifyComplete()
} else {
assertThat(it.returnValue).isEqualTo(expected)
}
Expand Down