31
31
import java .util .Optional ;
32
32
import java .util .function .Function ;
33
33
import org .reactivestreams .Publisher ;
34
- import org .slf4j .Logger ;
35
- import org .slf4j .LoggerFactory ;
36
34
import reactor .core .Exceptions ;
37
35
import reactor .core .publisher .Flux ;
38
36
import reactor .core .publisher .Mono ;
@@ -46,7 +44,8 @@ public class ServiceCall implements AutoCloseable {
46
44
private ServiceClientErrorMapper errorMapper = DefaultErrorMapper .INSTANCE ;
47
45
private Map <String , String > credentials = Collections .emptyMap ();
48
46
private String contentType = ServiceMessage .DEFAULT_DATA_FORMAT ;
49
- private Logger logger ;
47
+
48
+ // private Logger logger;
50
49
51
50
public ServiceCall () {}
52
51
@@ -57,7 +56,6 @@ private ServiceCall(ServiceCall other) {
57
56
this .errorMapper = other .errorMapper ;
58
57
this .contentType = other .contentType ;
59
58
this .credentials = Collections .unmodifiableMap (new HashMap <>(other .credentials ));
60
- this .logger = other .logger ;
61
59
}
62
60
63
61
/**
@@ -144,42 +142,6 @@ public ServiceCall contentType(String contentType) {
144
142
return target ;
145
143
}
146
144
147
- /**
148
- * Setter for {@link ServiceCall} {@code logger}.
149
- *
150
- * @param name logger name (optional)
151
- * @return new {@link ServiceCall} instance.
152
- */
153
- public ServiceCall logger (String name ) {
154
- ServiceCall target = new ServiceCall (this );
155
- target .logger = name != null ? LoggerFactory .getLogger (name ) : null ;
156
- return target ;
157
- }
158
-
159
- /**
160
- * Setter for {@link ServiceCall} {@code logger}.
161
- *
162
- * @param clazz logger name (optional)
163
- * @return new {@link ServiceCall} instance.
164
- */
165
- public ServiceCall logger (Class <?> clazz ) {
166
- ServiceCall target = new ServiceCall (this );
167
- target .logger = clazz != null ? LoggerFactory .getLogger (clazz ) : null ;
168
- return target ;
169
- }
170
-
171
- /**
172
- * Setter for {@link ServiceCall} {@code logger}.
173
- *
174
- * @param logger logger (optional)
175
- * @return new {@link ServiceCall} instance.
176
- */
177
- public ServiceCall logger (Logger logger ) {
178
- ServiceCall target = new ServiceCall (this );
179
- target .logger = logger ;
180
- return target ;
181
- }
182
-
183
145
/**
184
146
* Invokes fire-and-forget request.
185
147
*
@@ -209,50 +171,37 @@ public Mono<ServiceMessage> requestOne(ServiceMessage request) {
209
171
*/
210
172
public Mono <ServiceMessage > requestOne (ServiceMessage request , Type responseType ) {
211
173
return Mono .defer (
212
- () -> {
213
- ServiceMethodInvoker methodInvoker ;
214
- if (serviceRegistry != null
215
- && (methodInvoker = serviceRegistry .lookupInvoker (request )) != null ) {
216
- // local service
217
- return methodInvoker
218
- .invokeOne (request )
219
- .map (this ::throwIfError )
220
- .contextWrite (
221
- context -> {
222
- if (context .hasKey (RequestContext .class )) {
223
- return context ;
224
- } else {
225
- return new RequestContext (context )
226
- .headers (request .headers ())
227
- .request (request )
228
- .principal (NULL_PRINCIPAL );
229
- }
230
- });
231
- } else {
232
- // remote service
233
- Objects .requireNonNull (transport , "[requestOne] transport" );
234
- return Mono .fromCallable (() -> serviceLookup (request ))
235
- .flatMap (
236
- serviceReference ->
237
- transport
238
- .create (serviceReference )
239
- .requestResponse (request , responseType )
240
- .map (this ::throwIfError ));
241
- }
242
- })
243
- .doOnSuccess (
244
- response -> {
245
- if (logger != null && logger .isDebugEnabled ()) {
246
- logger .debug (
247
- "[{}] request: {}, response: {}" , request .qualifier (), request , response );
248
- }
249
- })
250
- .doOnError (
251
- ex -> {
252
- if (logger != null ) {
253
- logger .error ("[{}][error] request: {}" , request .qualifier (), request , ex );
254
- }
255
- });
174
+ () -> {
175
+ ServiceMethodInvoker methodInvoker ;
176
+ if (serviceRegistry != null
177
+ && (methodInvoker = serviceRegistry .lookupInvoker (request )) != null ) {
178
+ // local service
179
+ return methodInvoker
180
+ .invokeOne (request )
181
+ .map (this ::throwIfError )
182
+ .contextWrite (
183
+ context -> {
184
+ if (context .hasKey (RequestContext .class )) {
185
+ return context ;
186
+ } else {
187
+ return new RequestContext (context )
188
+ .headers (request .headers ())
189
+ .request (request )
190
+ .principal (NULL_PRINCIPAL );
191
+ }
192
+ });
193
+ } else {
194
+ // remote service
195
+ Objects .requireNonNull (transport , "[requestOne] transport" );
196
+ return Mono .fromCallable (() -> serviceLookup (request ))
197
+ .flatMap (
198
+ serviceReference ->
199
+ transport
200
+ .create (serviceReference )
201
+ .requestResponse (request , responseType )
202
+ .map (this ::throwIfError ));
203
+ }
204
+ });
256
205
}
257
206
258
207
/**
@@ -274,55 +223,37 @@ public Flux<ServiceMessage> requestMany(ServiceMessage request) {
274
223
*/
275
224
public Flux <ServiceMessage > requestMany (ServiceMessage request , Type responseType ) {
276
225
return Flux .defer (
277
- () -> {
278
- ServiceMethodInvoker methodInvoker ;
279
- if (serviceRegistry != null
280
- && (methodInvoker = serviceRegistry .lookupInvoker (request )) != null ) {
281
- // local service
282
- return methodInvoker
283
- .invokeMany (request )
284
- .map (this ::throwIfError )
285
- .contextWrite (
286
- context -> {
287
- if (context .hasKey (RequestContext .class )) {
288
- return context ;
289
- } else {
290
- return new RequestContext (context )
291
- .headers (request .headers ())
292
- .request (request )
293
- .principal (NULL_PRINCIPAL );
294
- }
295
- });
296
- } else {
297
- // remote service
298
- Objects .requireNonNull (transport , "[requestMany] transport" );
299
- return Mono .fromCallable (() -> serviceLookup (request ))
300
- .flatMapMany (
301
- serviceReference ->
302
- transport
303
- .create (serviceReference )
304
- .requestStream (request , responseType )
305
- .map (this ::throwIfError ));
306
- }
307
- })
308
- .doOnSubscribe (
309
- s -> {
310
- if (logger != null && logger .isDebugEnabled ()) {
311
- logger .debug ("[{}][subscribe] request: {}" , request .qualifier (), request );
312
- }
313
- })
314
- .doOnComplete (
315
- () -> {
316
- if (logger != null && logger .isDebugEnabled ()) {
317
- logger .debug ("[{}][complete] request: {}" , request .qualifier (), request );
318
- }
319
- })
320
- .doOnError (
321
- ex -> {
322
- if (logger != null ) {
323
- logger .error ("[{}][error] request: {}" , request .qualifier (), request , ex );
324
- }
325
- });
226
+ () -> {
227
+ ServiceMethodInvoker methodInvoker ;
228
+ if (serviceRegistry != null
229
+ && (methodInvoker = serviceRegistry .lookupInvoker (request )) != null ) {
230
+ // local service
231
+ return methodInvoker
232
+ .invokeMany (request )
233
+ .map (this ::throwIfError )
234
+ .contextWrite (
235
+ context -> {
236
+ if (context .hasKey (RequestContext .class )) {
237
+ return context ;
238
+ } else {
239
+ return new RequestContext (context )
240
+ .headers (request .headers ())
241
+ .request (request )
242
+ .principal (NULL_PRINCIPAL );
243
+ }
244
+ });
245
+ } else {
246
+ // remote service
247
+ Objects .requireNonNull (transport , "[requestMany] transport" );
248
+ return Mono .fromCallable (() -> serviceLookup (request ))
249
+ .flatMapMany (
250
+ serviceReference ->
251
+ transport
252
+ .create (serviceReference )
253
+ .requestStream (request , responseType )
254
+ .map (this ::throwIfError ));
255
+ }
256
+ });
326
257
}
327
258
328
259
/**
@@ -353,7 +284,20 @@ public Flux<ServiceMessage> requestBidirectional(
353
284
if (serviceRegistry != null
354
285
&& (methodInvoker = serviceRegistry .lookupInvoker (request )) != null ) {
355
286
// local service
356
- return methodInvoker .invokeBidirectional (messages ).map (this ::throwIfError );
287
+ return methodInvoker
288
+ .invokeBidirectional (messages )
289
+ .map (this ::throwIfError )
290
+ .contextWrite (
291
+ context -> {
292
+ if (context .hasKey (RequestContext .class )) {
293
+ return context ;
294
+ } else {
295
+ return new RequestContext (context )
296
+ .headers (request .headers ())
297
+ .request (request )
298
+ .principal (NULL_PRINCIPAL );
299
+ }
300
+ });
357
301
} else {
358
302
// remote service
359
303
Objects .requireNonNull (transport , "[requestBidirectional] transport" );
0 commit comments